From 3d4c416d00c22e48a42e7f7e6e0abd150e76ffee Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Tue, 15 Oct 2024 10:44:01 +0800 Subject: [PATCH 01/15] client: move option to option.go (#8699) ref tikv/pd#4399 Signed-off-by: okJiang <819421878@qq.com> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- client/client.go | 118 ----------------------------------------------- client/option.go | 117 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 117 insertions(+), 118 deletions(-) diff --git a/client/client.go b/client/client.go index 3faa3a09215..27952df13cd 100644 --- a/client/client.go +++ b/client/client.go @@ -36,7 +36,6 @@ import ( "github.com/tikv/pd/client/tlsutil" "github.com/tikv/pd/client/tsoutil" "go.uber.org/zap" - "google.golang.org/grpc" ) const ( @@ -174,69 +173,6 @@ type Client interface { Close() } -// GetStoreOp represents available options when getting stores. -type GetStoreOp struct { - excludeTombstone bool -} - -// GetStoreOption configures GetStoreOp. -type GetStoreOption func(*GetStoreOp) - -// WithExcludeTombstone excludes tombstone stores from the result. -func WithExcludeTombstone() GetStoreOption { - return func(op *GetStoreOp) { op.excludeTombstone = true } -} - -// RegionsOp represents available options when operate regions -type RegionsOp struct { - group string - retryLimit uint64 - skipStoreLimit bool -} - -// RegionsOption configures RegionsOp -type RegionsOption func(op *RegionsOp) - -// WithGroup specify the group during Scatter/Split Regions -func WithGroup(group string) RegionsOption { - return func(op *RegionsOp) { op.group = group } -} - -// WithRetry specify the retry limit during Scatter/Split Regions -func WithRetry(retry uint64) RegionsOption { - return func(op *RegionsOp) { op.retryLimit = retry } -} - -// WithSkipStoreLimit specify if skip the store limit check during Scatter/Split Regions -func WithSkipStoreLimit() RegionsOption { - return func(op *RegionsOp) { op.skipStoreLimit = true } -} - -// GetRegionOp represents available options when getting regions. -type GetRegionOp struct { - needBuckets bool - allowFollowerHandle bool - outputMustContainAllKeyRange bool -} - -// GetRegionOption configures GetRegionOp. -type GetRegionOption func(op *GetRegionOp) - -// WithBuckets means getting region and its buckets. -func WithBuckets() GetRegionOption { - return func(op *GetRegionOp) { op.needBuckets = true } -} - -// WithAllowFollowerHandle means that client can send request to follower and let it handle this request. -func WithAllowFollowerHandle() GetRegionOption { - return func(op *GetRegionOp) { op.allowFollowerHandle = true } -} - -// WithOutputMustContainAllKeyRange means the output must contain all key ranges. -func WithOutputMustContainAllKeyRange() GetRegionOption { - return func(op *GetRegionOp) { op.outputMustContainAllKeyRange = true } -} - var ( // errUnmatchedClusterID is returned when found a PD with a different cluster ID. errUnmatchedClusterID = errors.New("[pd] unmatched cluster id") @@ -250,60 +186,6 @@ var ( errNoServiceModeReturned = errors.New("[pd] no service mode returned") ) -// ClientOption configures client. -type ClientOption func(c *client) - -// WithGRPCDialOptions configures the client with gRPC dial options. -func WithGRPCDialOptions(opts ...grpc.DialOption) ClientOption { - return func(c *client) { - c.option.gRPCDialOptions = append(c.option.gRPCDialOptions, opts...) - } -} - -// WithCustomTimeoutOption configures the client with timeout option. -func WithCustomTimeoutOption(timeout time.Duration) ClientOption { - return func(c *client) { - c.option.timeout = timeout - } -} - -// WithForwardingOption configures the client with forwarding option. -func WithForwardingOption(enableForwarding bool) ClientOption { - return func(c *client) { - c.option.enableForwarding = enableForwarding - } -} - -// WithTSOServerProxyOption configures the client to use TSO server proxy, -// i.e., the client will send TSO requests to the API leader (the TSO server -// proxy) which will forward the requests to the TSO servers. -func WithTSOServerProxyOption(useTSOServerProxy bool) ClientOption { - return func(c *client) { - c.option.useTSOServerProxy = useTSOServerProxy - } -} - -// WithMaxErrorRetry configures the client max retry times when connect meets error. -func WithMaxErrorRetry(count int) ClientOption { - return func(c *client) { - c.option.maxRetryTimes = count - } -} - -// WithMetricsLabels configures the client with metrics labels. -func WithMetricsLabels(labels prometheus.Labels) ClientOption { - return func(c *client) { - c.option.metricsLabels = labels - } -} - -// WithInitMetricsOption configures the client with metrics labels. -func WithInitMetricsOption(initMetrics bool) ClientOption { - return func(c *client) { - c.option.initMetrics = initMetrics - } -} - var _ Client = (*client)(nil) // serviceModeKeeper is for service mode switching. diff --git a/client/option.go b/client/option.go index 3f2b7119b52..ca21dcfefbf 100644 --- a/client/option.go +++ b/client/option.go @@ -142,3 +142,120 @@ func (o *option) setTSOClientRPCConcurrency(value int) { func (o *option) getTSOClientRPCConcurrency() int { return o.dynamicOptions[TSOClientRPCConcurrency].Load().(int) } + +// GetStoreOp represents available options when getting stores. +type GetStoreOp struct { + excludeTombstone bool +} + +// GetStoreOption configures GetStoreOp. +type GetStoreOption func(*GetStoreOp) + +// WithExcludeTombstone excludes tombstone stores from the result. +func WithExcludeTombstone() GetStoreOption { + return func(op *GetStoreOp) { op.excludeTombstone = true } +} + +// RegionsOp represents available options when operate regions +type RegionsOp struct { + group string + retryLimit uint64 + skipStoreLimit bool +} + +// RegionsOption configures RegionsOp +type RegionsOption func(op *RegionsOp) + +// WithGroup specify the group during Scatter/Split Regions +func WithGroup(group string) RegionsOption { + return func(op *RegionsOp) { op.group = group } +} + +// WithRetry specify the retry limit during Scatter/Split Regions +func WithRetry(retry uint64) RegionsOption { + return func(op *RegionsOp) { op.retryLimit = retry } +} + +// WithSkipStoreLimit specify if skip the store limit check during Scatter/Split Regions +func WithSkipStoreLimit() RegionsOption { + return func(op *RegionsOp) { op.skipStoreLimit = true } +} + +// GetRegionOp represents available options when getting regions. +type GetRegionOp struct { + needBuckets bool + allowFollowerHandle bool + outputMustContainAllKeyRange bool +} + +// GetRegionOption configures GetRegionOp. +type GetRegionOption func(op *GetRegionOp) + +// WithBuckets means getting region and its buckets. +func WithBuckets() GetRegionOption { + return func(op *GetRegionOp) { op.needBuckets = true } +} + +// WithAllowFollowerHandle means that client can send request to follower and let it handle this request. +func WithAllowFollowerHandle() GetRegionOption { + return func(op *GetRegionOp) { op.allowFollowerHandle = true } +} + +// WithOutputMustContainAllKeyRange means the output must contain all key ranges. +func WithOutputMustContainAllKeyRange() GetRegionOption { + return func(op *GetRegionOp) { op.outputMustContainAllKeyRange = true } +} + +// ClientOption configures client. +type ClientOption func(c *client) + +// WithGRPCDialOptions configures the client with gRPC dial options. +func WithGRPCDialOptions(opts ...grpc.DialOption) ClientOption { + return func(c *client) { + c.option.gRPCDialOptions = append(c.option.gRPCDialOptions, opts...) + } +} + +// WithCustomTimeoutOption configures the client with timeout option. +func WithCustomTimeoutOption(timeout time.Duration) ClientOption { + return func(c *client) { + c.option.timeout = timeout + } +} + +// WithForwardingOption configures the client with forwarding option. +func WithForwardingOption(enableForwarding bool) ClientOption { + return func(c *client) { + c.option.enableForwarding = enableForwarding + } +} + +// WithTSOServerProxyOption configures the client to use TSO server proxy, +// i.e., the client will send TSO requests to the API leader (the TSO server +// proxy) which will forward the requests to the TSO servers. +func WithTSOServerProxyOption(useTSOServerProxy bool) ClientOption { + return func(c *client) { + c.option.useTSOServerProxy = useTSOServerProxy + } +} + +// WithMaxErrorRetry configures the client max retry times when connect meets error. +func WithMaxErrorRetry(count int) ClientOption { + return func(c *client) { + c.option.maxRetryTimes = count + } +} + +// WithMetricsLabels configures the client with metrics labels. +func WithMetricsLabels(labels prometheus.Labels) ClientOption { + return func(c *client) { + c.option.metricsLabels = labels + } +} + +// WithInitMetricsOption configures the client with metrics labels. +func WithInitMetricsOption(initMetrics bool) ClientOption { + return func(c *client) { + c.option.initMetrics = initMetrics + } +} From b70107ec31e65736db6fc71bc32177b901217ec7 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Wed, 16 Oct 2024 14:49:47 +0800 Subject: [PATCH 02/15] client/tso: fix the issue where the TSO follower proxy cannot be closed (#8719) close tikv/pd#8709 Remove outdated follower connections after disabling the TSO follower proxy. Signed-off-by: JmPotato --- client/tso_client.go | 6 +++- tests/integrations/client/client_test.go | 36 +++++++++++++++++++++++- 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/client/tso_client.go b/client/tso_client.go index f1538a7f164..6801aee3a11 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -386,7 +386,9 @@ func (c *tsoClient) tryConnectToTSO( cc *grpc.ClientConn updateAndClear = func(newURL string, connectionCtx *tsoConnectionContext) { // Only store the `connectionCtx` if it does not exist before. - connectionCtxs.LoadOrStore(newURL, connectionCtx) + if connectionCtx != nil { + connectionCtxs.LoadOrStore(newURL, connectionCtx) + } // Remove all other `connectionCtx`s. connectionCtxs.Range(func(url, cc any) bool { if url.(string) != newURL { @@ -405,6 +407,8 @@ func (c *tsoClient) tryConnectToTSO( c.svcDiscovery.ScheduleCheckMemberChanged() cc, url = c.GetTSOAllocatorClientConnByDCLocation(dc) if _, ok := connectionCtxs.Load(url); ok { + // Just trigger the clean up of the stale connection contexts. + updateAndClear(url, nil) return nil } if cc != nil { diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index 9f0b5f8d523..9574918a74a 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -361,7 +361,8 @@ func TestTSOFollowerProxy(t *testing.T) { defer cli1.Close() cli2 := setupCli(ctx, re, endpoints) defer cli2.Close() - cli2.UpdateOption(pd.EnableTSOFollowerProxy, true) + err = cli2.UpdateOption(pd.EnableTSOFollowerProxy, true) + re.NoError(err) var wg sync.WaitGroup wg.Add(tsoRequestConcurrencyNumber) @@ -385,6 +386,39 @@ func TestTSOFollowerProxy(t *testing.T) { }() } wg.Wait() + + // Disable the follower proxy and check if the stream is updated. + err = cli2.UpdateOption(pd.EnableTSOFollowerProxy, false) + re.NoError(err) + + wg.Add(tsoRequestConcurrencyNumber) + for i := 0; i < tsoRequestConcurrencyNumber; i++ { + go func() { + defer wg.Done() + var lastTS uint64 + for i := 0; i < tsoRequestRound; i++ { + physical, logical, err := cli2.GetTS(context.Background()) + if err != nil { + // It can only be the context canceled error caused by the stale stream cleanup. + re.ErrorContains(err, "context canceled") + continue + } + re.NoError(err) + ts := tsoutil.ComposeTS(physical, logical) + re.Less(lastTS, ts) + lastTS = ts + // After requesting with the follower proxy, request with the leader directly. + physical, logical, err = cli1.GetTS(context.Background()) + re.NoError(err) + ts = tsoutil.ComposeTS(physical, logical) + re.Less(lastTS, ts) + lastTS = ts + } + // Ensure at least one request is successful. + re.NotEmpty(lastTS) + }() + } + wg.Wait() } func TestTSOFollowerProxyWithTSOService(t *testing.T) { From 48db925e171c9244e101b55d021721655891bfc1 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 16 Oct 2024 15:43:52 +0800 Subject: [PATCH 03/15] syncer: make log clearer when load region (#8718) close tikv/pd#8717 Signed-off-by: lhy1024 Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/syncer/client.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/syncer/client.go b/pkg/syncer/client.go index a94f5c41f3f..bf7f91cfc60 100644 --- a/pkg/syncer/client.go +++ b/pkg/syncer/client.go @@ -104,9 +104,10 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { log.Info("region syncer start load region") start := time.Now() err := storage.TryLoadRegionsOnce(ctx, regionStorage, bc.CheckAndPutRegion) - log.Info("region syncer finished load regions", zap.Duration("time-cost", time.Since(start))) if err != nil { - log.Warn("failed to load regions", errs.ZapError(err)) + log.Warn("region syncer failed to load regions", errs.ZapError(err), zap.Duration("time-cost", time.Since(start))) + } else { + log.Info("region syncer finished load regions", zap.Duration("time-cost", time.Since(start))) } // establish client. conn := grpcutil.CreateClientConn(ctx, addr, s.tlsConfig, From 26123dc7518b74af2b307ba2dbab89e51798cf7e Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 16 Oct 2024 16:55:20 +0800 Subject: [PATCH 04/15] checker, statistic: avoid leak in label statistic (#8703) close tikv/pd#8700 Signed-off-by: lhy1024 Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/cluster/cluster.go | 2 +- pkg/mcs/scheduling/server/cluster.go | 3 +-- pkg/statistics/region_collection.go | 24 +++++++++++++---- server/cluster/cluster_test.go | 36 ++++++++++++++++++++++--- server/cluster/scheduling_controller.go | 1 + 5 files changed, 55 insertions(+), 11 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index b1abe4677df..815e04f5c54 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -68,7 +68,7 @@ func HandleOverlaps(ctx context.Context, c Cluster, overlaps []*core.RegionInfo) if c.GetRegionStats() != nil { c.GetRegionStats().ClearDefunctRegion(item.GetID()) } - c.GetLabelStats().ClearDefunctRegion(item.GetID()) + c.GetLabelStats().MarkDefunctRegion(item.GetID()) c.GetRuleManager().InvalidCache(item.GetID()) } } diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 0dcb26a1a1f..5885a9cdb84 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -379,13 +379,12 @@ func (c *Cluster) waitSchedulersInitialized() { } } -// TODO: implement the following methods - // UpdateRegionsLabelLevelStats updates the status of the region label level by types. func (c *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) { for _, region := range regions { c.labelStats.Observe(region, c.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), c.persistConfig.GetLocationLabels()) } + c.labelStats.ClearDefunctRegions() } func (c *Cluster) getStoresWithoutLabelLocked(region *core.RegionInfo, key, value string) []*core.StoreInfo { diff --git a/pkg/statistics/region_collection.go b/pkg/statistics/region_collection.go index 30197dd43ea..7e51a8a7bdd 100644 --- a/pkg/statistics/region_collection.go +++ b/pkg/statistics/region_collection.go @@ -365,6 +365,7 @@ type LabelStatistics struct { syncutil.RWMutex regionLabelStats map[uint64]string labelCounter map[string]int + defunctRegions map[uint64]struct{} } // NewLabelStatistics creates a new LabelStatistics. @@ -372,6 +373,7 @@ func NewLabelStatistics() *LabelStatistics { return &LabelStatistics{ regionLabelStats: make(map[uint64]string), labelCounter: make(map[string]int), + defunctRegions: make(map[uint64]struct{}), } } @@ -405,14 +407,26 @@ func ResetLabelStatsMetrics() { regionLabelLevelGauge.Reset() } -// ClearDefunctRegion is used to handle the overlap region. -func (l *LabelStatistics) ClearDefunctRegion(regionID uint64) { +// MarkDefunctRegion is used to handle the overlap region. +// It is used to mark the region as defunct and remove it from the label statistics later. +func (l *LabelStatistics) MarkDefunctRegion(regionID uint64) { l.Lock() defer l.Unlock() - if label, ok := l.regionLabelStats[regionID]; ok { - l.labelCounter[label]-- - delete(l.regionLabelStats, regionID) + l.defunctRegions[regionID] = struct{}{} +} + +// ClearDefunctRegions is used to handle the overlap region. +// It is used to remove the defunct regions from the label statistics. +func (l *LabelStatistics) ClearDefunctRegions() { + l.Lock() + defer l.Unlock() + for regionID := range l.defunctRegions { + if label, ok := l.regionLabelStats[regionID]; ok { + l.labelCounter[label]-- + delete(l.regionLabelStats, regionID) + } } + l.defunctRegions = make(map[uint64]struct{}) } // GetLabelCounter is only used for tests. diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index fa1b14d107e..a37f9718bdb 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -1125,6 +1125,7 @@ func TestRegionLabelIsolationLevel(t *testing.T) { opt.SetReplicationConfig(cfg) re.NoError(err) cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend()) + cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) for i := uint64(1); i <= 4; i++ { var labels []*metapb.StoreLabel @@ -1159,13 +1160,42 @@ func TestRegionLabelIsolationLevel(t *testing.T) { StartKey: []byte{byte(1)}, EndKey: []byte{byte(2)}, } - r := core.NewRegionInfo(region, peers[0]) - re.NoError(cluster.putRegion(r)) + r1 := core.NewRegionInfo(region, peers[0]) + re.NoError(cluster.putRegion(r1)) - cluster.UpdateRegionsLabelLevelStats([]*core.RegionInfo{r}) + cluster.UpdateRegionsLabelLevelStats([]*core.RegionInfo{r1}) counter := cluster.labelStats.GetLabelCounter() re.Equal(0, counter["none"]) re.Equal(1, counter["zone"]) + + region = &metapb.Region{ + Id: 10, + Peers: peers, + StartKey: []byte{byte(2)}, + EndKey: []byte{byte(3)}, + } + r2 := core.NewRegionInfo(region, peers[0]) + re.NoError(cluster.putRegion(r2)) + + cluster.UpdateRegionsLabelLevelStats([]*core.RegionInfo{r2}) + counter = cluster.labelStats.GetLabelCounter() + re.Equal(0, counter["none"]) + re.Equal(2, counter["zone"]) + + // issue: https://github.com/tikv/pd/issues/8700 + // step1: heartbeat a overlap region, which is used to simulate the case that the region is merged. + // step2: update region 9 and region 10, which is used to simulate the case that patrol is triggered. + // We should only count region 9. + overlapRegion := r1.Clone( + core.WithStartKey(r1.GetStartKey()), + core.WithEndKey(r2.GetEndKey()), + core.WithLeader(r2.GetPeer(8)), + ) + re.NoError(cluster.HandleRegionHeartbeat(overlapRegion)) + cluster.UpdateRegionsLabelLevelStats([]*core.RegionInfo{r1, r2}) + counter = cluster.labelStats.GetLabelCounter() + re.Equal(0, counter["none"]) + re.Equal(1, counter["zone"]) } func heartbeatRegions(re *require.Assertions, cluster *RaftCluster, regions []*core.RegionInfo) { diff --git a/server/cluster/scheduling_controller.go b/server/cluster/scheduling_controller.go index 5d617700804..8578b3480d8 100644 --- a/server/cluster/scheduling_controller.go +++ b/server/cluster/scheduling_controller.go @@ -236,6 +236,7 @@ func (sc *schedulingController) UpdateRegionsLabelLevelStats(regions []*core.Reg for _, region := range regions { sc.labelStats.Observe(region, sc.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), sc.opt.GetLocationLabels()) } + sc.labelStats.ClearDefunctRegions() } func (sc *schedulingController) getStoresWithoutLabelLocked(region *core.RegionInfo, key, value string) []*core.StoreInfo { From f0c84e49b4aea4104ff4151b2abdc9712a6b2db7 Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Wed, 16 Oct 2024 17:45:26 +0800 Subject: [PATCH 05/15] client: reuse `initRetry` function (#8707) ref tikv/pd#4399 Signed-off-by: okJiang <819421878@qq.com> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- client/client.go | 39 +++++++-------------------------- client/pd_service_discovery.go | 32 +++++++++++++-------------- client/tso_service_discovery.go | 8 +++---- 3 files changed, 28 insertions(+), 51 deletions(-) diff --git a/client/client.go b/client/client.go index 27952df13cd..f8c8d32cee8 100644 --- a/client/client.go +++ b/client/client.go @@ -442,6 +442,7 @@ func newClientWithKeyspaceName( } clientCtx, clientCancel := context.WithCancel(ctx) c := &client{ + keyspaceID: nullKeyspaceID, updateTokenConnectionCh: make(chan struct{}, 1), ctx: clientCtx, cancel: clientCancel, @@ -455,10 +456,12 @@ func newClientWithKeyspaceName( opt(c) } - updateKeyspaceIDCb := func() error { - if err := c.initRetry(c.loadKeyspaceMeta, keyspaceName); err != nil { + updateKeyspaceIDFunc := func() error { + keyspaceMeta, err := c.LoadKeyspace(clientCtx, keyspaceName) + if err != nil { return err } + c.keyspaceID = keyspaceMeta.GetId() // c.keyspaceID is the source of truth for keyspace id. c.pdSvcDiscovery.SetKeyspaceID(c.keyspaceID) return nil @@ -466,8 +469,8 @@ func newClientWithKeyspaceName( // Create a PD service discovery with null keyspace id, then query the real id with the keyspace name, // finally update the keyspace id to the PD service discovery for the following interactions. - c.pdSvcDiscovery = newPDServiceDiscovery( - clientCtx, clientCancel, &c.wg, c.setServiceMode, updateKeyspaceIDCb, nullKeyspaceID, c.svrUrls, c.tlsCfg, c.option) + c.pdSvcDiscovery = newPDServiceDiscovery(clientCtx, clientCancel, &c.wg, + c.setServiceMode, updateKeyspaceIDFunc, nullKeyspaceID, c.svrUrls, c.tlsCfg, c.option) if err := c.setup(); err != nil { c.cancel() if c.pdSvcDiscovery != nil { @@ -482,32 +485,6 @@ func newClientWithKeyspaceName( return c, nil } -func (c *client) initRetry(f func(s string) error, str string) error { - var err error - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - for i := 0; i < c.option.maxRetryTimes; i++ { - if err = f(str); err == nil { - return nil - } - select { - case <-c.ctx.Done(): - return err - case <-ticker.C: - } - } - return errors.WithStack(err) -} - -func (c *client) loadKeyspaceMeta(keyspace string) error { - keyspaceMeta, err := c.LoadKeyspace(context.TODO(), keyspace) - if err != nil { - return err - } - c.keyspaceID = keyspaceMeta.GetId() - return nil -} - func (c *client) setup() error { // Init the metrics. if c.option.initMetrics { @@ -579,7 +556,7 @@ func (c *client) resetTSOClientLocked(mode pdpb.ServiceMode) { case pdpb.ServiceMode_API_SVC_MODE: newTSOSvcDiscovery = newTSOServiceDiscovery( c.ctx, MetaStorageClient(c), c.pdSvcDiscovery, - c.GetClusterID(c.ctx), c.keyspaceID, c.tlsCfg, c.option) + c.keyspaceID, c.tlsCfg, c.option) // At this point, the keyspace group isn't known yet. Starts from the default keyspace group, // and will be updated later. newTSOCli = newTSOClient(c.ctx, c.option, diff --git a/client/pd_service_discovery.go b/client/pd_service_discovery.go index c34a5bebac6..f42ae7fea4a 100644 --- a/client/pd_service_discovery.go +++ b/client/pd_service_discovery.go @@ -440,9 +440,9 @@ type pdServiceDiscovery struct { cancel context.CancelFunc closeOnce sync.Once - updateKeyspaceIDCb updateKeyspaceIDFunc - keyspaceID uint32 - tlsCfg *tls.Config + updateKeyspaceIDFunc updateKeyspaceIDFunc + keyspaceID uint32 + tlsCfg *tls.Config // Client option. option *option } @@ -461,21 +461,21 @@ func newPDServiceDiscovery( ctx context.Context, cancel context.CancelFunc, wg *sync.WaitGroup, serviceModeUpdateCb func(pdpb.ServiceMode), - updateKeyspaceIDCb updateKeyspaceIDFunc, + updateKeyspaceIDFunc updateKeyspaceIDFunc, keyspaceID uint32, urls []string, tlsCfg *tls.Config, option *option, ) *pdServiceDiscovery { pdsd := &pdServiceDiscovery{ - checkMembershipCh: make(chan struct{}, 1), - ctx: ctx, - cancel: cancel, - wg: wg, - apiCandidateNodes: [apiKindCount]*pdServiceBalancer{newPDServiceBalancer(emptyErrorFn), newPDServiceBalancer(regionAPIErrorFn)}, - serviceModeUpdateCb: serviceModeUpdateCb, - updateKeyspaceIDCb: updateKeyspaceIDCb, - keyspaceID: keyspaceID, - tlsCfg: tlsCfg, - option: option, + checkMembershipCh: make(chan struct{}, 1), + ctx: ctx, + cancel: cancel, + wg: wg, + apiCandidateNodes: [apiKindCount]*pdServiceBalancer{newPDServiceBalancer(emptyErrorFn), newPDServiceBalancer(regionAPIErrorFn)}, + serviceModeUpdateCb: serviceModeUpdateCb, + updateKeyspaceIDFunc: updateKeyspaceIDFunc, + keyspaceID: keyspaceID, + tlsCfg: tlsCfg, + option: option, } urls = addrsToURLs(urls, tlsCfg) pdsd.urls.Store(urls) @@ -500,8 +500,8 @@ func (c *pdServiceDiscovery) Init() error { // We need to update the keyspace ID before we discover and update the service mode // so that TSO in API mode can be initialized with the correct keyspace ID. - if c.updateKeyspaceIDCb != nil { - if err := c.updateKeyspaceIDCb(); err != nil { + if c.keyspaceID == nullKeyspaceID && c.updateKeyspaceIDFunc != nil { + if err := c.initRetry(c.updateKeyspaceIDFunc); err != nil { return err } } diff --git a/client/tso_service_discovery.go b/client/tso_service_discovery.go index 443d455e911..617b709ca76 100644 --- a/client/tso_service_discovery.go +++ b/client/tso_service_discovery.go @@ -158,7 +158,7 @@ type tsoServiceDiscovery struct { // newTSOServiceDiscovery returns a new client-side service discovery for the independent TSO service. func newTSOServiceDiscovery( ctx context.Context, metacli MetaStorageClient, apiSvcDiscovery ServiceDiscovery, - clusterID uint64, keyspaceID uint32, tlsCfg *tls.Config, option *option, + keyspaceID uint32, tlsCfg *tls.Config, option *option, ) ServiceDiscovery { ctx, cancel := context.WithCancel(ctx) c := &tsoServiceDiscovery{ @@ -166,7 +166,7 @@ func newTSOServiceDiscovery( cancel: cancel, metacli: metacli, apiSvcDiscovery: apiSvcDiscovery, - clusterID: clusterID, + clusterID: apiSvcDiscovery.GetClusterID(), tlsCfg: tlsCfg, option: option, checkMembershipCh: make(chan struct{}, 1), @@ -180,10 +180,10 @@ func newTSOServiceDiscovery( c.tsoServerDiscovery = &tsoServerDiscovery{urls: make([]string, 0)} // Start with the default keyspace group. The actual keyspace group, to which the keyspace belongs, // will be discovered later. - c.defaultDiscoveryKey = fmt.Sprintf(tsoSvcDiscoveryFormat, clusterID, defaultKeySpaceGroupID) + c.defaultDiscoveryKey = fmt.Sprintf(tsoSvcDiscoveryFormat, c.clusterID, defaultKeySpaceGroupID) log.Info("created tso service discovery", - zap.Uint64("cluster-id", clusterID), + zap.Uint64("cluster-id", c.clusterID), zap.Uint32("keyspace-id", keyspaceID), zap.String("default-discovery-key", c.defaultDiscoveryKey)) From 95e00f55b3c791fd93dc864512fb45c06bf2b10e Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Fri, 18 Oct 2024 13:03:22 +0800 Subject: [PATCH 06/15] test: make TestStoreWatch stable (#8724) close tikv/pd#8692 Signed-off-by: okJiang <819421878@qq.com> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- tests/integrations/mcs/scheduling/meta_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/integrations/mcs/scheduling/meta_test.go b/tests/integrations/mcs/scheduling/meta_test.go index d134336966a..e55f0281d72 100644 --- a/tests/integrations/mcs/scheduling/meta_test.go +++ b/tests/integrations/mcs/scheduling/meta_test.go @@ -86,7 +86,8 @@ func (suite *metaTestSuite) TestStoreWatch() { ) } - suite.getRaftCluster().RemoveStore(2, false) + re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/doNotBuryStore", `return(true)`)) + re.NoError(suite.getRaftCluster().RemoveStore(2, false)) testutil.Eventually(re, func() bool { s := cluster.GetStore(2) if s == nil { @@ -95,6 +96,7 @@ func (suite *metaTestSuite) TestStoreWatch() { return s.GetState() == metapb.StoreState_Offline }) re.Len(cluster.GetStores(), 4) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/doNotBuryStore")) testutil.Eventually(re, func() bool { return cluster.GetStore(2).GetState() == metapb.StoreState_Tombstone }) From 41bd2cd2674a4260bbd7cd4c3639e484987ae7ac Mon Sep 17 00:00:00 2001 From: Shirly Date: Fri, 18 Oct 2024 13:33:40 +0800 Subject: [PATCH 07/15] resource-group:fix issue that the deleted resource group still appear in metrics (#8722) close tikv/pd#8716 resource-group: fix issue that the deleted resource group still appear in metrics Signed-off-by: AndreMouche Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/mcs/resourcemanager/server/manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/mcs/resourcemanager/server/manager.go b/pkg/mcs/resourcemanager/server/manager.go index dd7151a8b05..618b7ac6dba 100644 --- a/pkg/mcs/resourcemanager/server/manager.go +++ b/pkg/mcs/resourcemanager/server/manager.go @@ -451,7 +451,7 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) { sqlCPUCost.DeleteLabelValues(r.name, r.name, r.ruType) requestCount.DeleteLabelValues(r.name, r.name, readTypeLabel) requestCount.DeleteLabelValues(r.name, r.name, writeTypeLabel) - availableRUCounter.DeleteLabelValues(r.name, r.name, r.ruType) + availableRUCounter.DeleteLabelValues(r.name, r.name) delete(m.consumptionRecord, r) delete(maxPerSecTrackers, r.name) readRequestUnitMaxPerSecCost.DeleteLabelValues(r.name) From f7c8e43faa18b5caaacdca53acb91796fc15700b Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 18 Oct 2024 14:13:25 +0800 Subject: [PATCH 08/15] pd-ctl, config: adjust the output of pd config (#8695) close tikv/pd#8694 Signed-off-by: lhy1024 Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- server/config/config.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/server/config/config.go b/server/config/config.go index d8bd086225c..c64ee3831b0 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -133,14 +133,14 @@ type Config struct { AutoCompactionRetention string `toml:"auto-compaction-retention" json:"auto-compaction-retention-v2"` // TickInterval is the interval for etcd Raft tick. - TickInterval typeutil.Duration `toml:"tick-interval"` + TickInterval typeutil.Duration `toml:"tick-interval" json:"tick-interval"` // ElectionInterval is the interval for etcd Raft election. - ElectionInterval typeutil.Duration `toml:"election-interval"` + ElectionInterval typeutil.Duration `toml:"election-interval" json:"election-interval"` // Prevote is true to enable Raft Pre-Vote. // If enabled, Raft runs an additional election phase // to check whether it would get enough votes to win // an election, thus minimizing disruptions. - PreVote bool `toml:"enable-prevote"` + PreVote bool `toml:"enable-prevote" json:"enable-prevote"` MaxRequestBytes uint `toml:"max-request-bytes" json:"max-request-bytes"` @@ -149,12 +149,12 @@ type Config struct { LabelProperty LabelPropertyConfig `toml:"label-property" json:"label-property"` // For all warnings during parsing. - WarningMsgs []string + WarningMsgs []string `json:"-"` - DisableStrictReconfigCheck bool + DisableStrictReconfigCheck bool `json:"-"` - HeartbeatStreamBindInterval typeutil.Duration - LeaderPriorityCheckInterval typeutil.Duration + HeartbeatStreamBindInterval typeutil.Duration `json:"-"` + LeaderPriorityCheckInterval typeutil.Duration `json:"-"` Logger *zap.Logger `json:"-"` LogProps *log.ZapProperties `json:"-"` From c43acaa7320b17d20265c3765f523de51f5c5de3 Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Fri, 18 Oct 2024 15:24:04 +0800 Subject: [PATCH 09/15] test: make TestLabelerRuleTTL stable (#8723) close tikv/pd#8687 Signed-off-by: okJiang <819421878@qq.com> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/schedule/labeler/labeler_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/schedule/labeler/labeler_test.go b/pkg/schedule/labeler/labeler_test.go index bd51bab7d83..34490a7249d 100644 --- a/pkg/schedule/labeler/labeler_test.go +++ b/pkg/schedule/labeler/labeler_test.go @@ -341,7 +341,7 @@ func TestKeyRange(t *testing.T) { func TestLabelerRuleTTL(t *testing.T) { re := require.New(t) store := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil) - labeler, err := NewRegionLabeler(context.Background(), store, time.Millisecond*10) + labeler, err := NewRegionLabeler(context.Background(), store, time.Minute) re.NoError(err) rules := []*LabelRule{ { From e01a7dd6cae869c5e92c7e4a795b9e008e94005e Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Fri, 18 Oct 2024 16:38:28 +0800 Subject: [PATCH 10/15] test: refactor real cluster test and implement cluster id case (#8685) close tikv/pd#8683, close tikv/pd#8684 Signed-off-by: okJiang <819421878@qq.com> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- .gitignore | 1 + tests/integrations/realcluster/Makefile | 12 +- .../realcluster/cluster_id_test.go | 85 +++++ tests/integrations/realcluster/deploy.sh | 4 +- .../download_integration_test_binaries.sh | 80 +++++ .../integrations/realcluster/real_cluster.go | 169 +++++++++ .../realcluster/reboot_pd_test.go | 104 +++--- .../realcluster/scheduler_test.go | 326 +++++++++--------- tests/integrations/realcluster/ts_test.go | 52 ++- tests/integrations/realcluster/util.go | 20 +- 10 files changed, 583 insertions(+), 270 deletions(-) create mode 100644 tests/integrations/realcluster/cluster_id_test.go create mode 100644 tests/integrations/realcluster/download_integration_test_binaries.sh create mode 100644 tests/integrations/realcluster/real_cluster.go diff --git a/.gitignore b/.gitignore index a9d942fe706..08166309a52 100644 --- a/.gitignore +++ b/.gitignore @@ -29,3 +29,4 @@ go.work* embedded_assets_handler.go *.log *.bin +third_bin diff --git a/tests/integrations/realcluster/Makefile b/tests/integrations/realcluster/Makefile index 28c918ec2bf..8550f27c58a 100644 --- a/tests/integrations/realcluster/Makefile +++ b/tests/integrations/realcluster/Makefile @@ -28,9 +28,17 @@ tidy: git diff go.mod go.sum | cat git diff --quiet go.mod go.sum -check: deploy test kill_cluster +check: tiup test -deploy: kill_cluster +tiup: + # if tiup binary not exist, download it + if ! which tiup > /dev/null 2>&1; then \ + curl --proto '=https' --tlsv1.2 -sSf https://tiup-mirrors.pingcap.com/install.sh | sh; \ + fi + +deploy: kill_cluster deploy_only + +deploy_only: @ echo "deploying..." ./deploy.sh @ echo "wait cluster ready..." diff --git a/tests/integrations/realcluster/cluster_id_test.go b/tests/integrations/realcluster/cluster_id_test.go new file mode 100644 index 00000000000..27bede7fa1d --- /dev/null +++ b/tests/integrations/realcluster/cluster_id_test.go @@ -0,0 +1,85 @@ +// Copyright 2024 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package realcluster + +import ( + "context" + "os/exec" + "strings" + "testing" + + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + pd "github.com/tikv/pd/client" +) + +type clusterIDSuite struct { + realClusterSuite +} + +func TestClusterID(t *testing.T) { + suite.Run(t, &clusterIDSuite{ + realClusterSuite: realClusterSuite{ + suiteName: "cluster_id", + }, + }) +} + +func (s *clusterIDSuite) TestClientClusterID() { + re := require.New(s.T()) + ctx := context.Background() + // deploy second cluster + s.startRealCluster(s.T()) + defer s.stopRealCluster(s.T()) + + pdEndpoints := getPDEndpoints(s.T()) + // Try to create a client with the mixed endpoints. + _, err := pd.NewClientWithContext( + ctx, pdEndpoints, + pd.SecurityOption{}, pd.WithMaxErrorRetry(1), + ) + re.Error(err) + re.Contains(err.Error(), "unmatched cluster id") +} + +func getPDEndpoints(t *testing.T) []string { + cmd := exec.Command("sh", "-c", "ps -ef | grep tikv-server | awk -F '--pd-endpoints=' '{print $2}' | awk '{print $1}'") + bytes, err := cmd.Output() + require.NoError(t, err) + pdAddrsForEachTikv := strings.Split(string(bytes), "\n") + var pdAddrs []string + for _, addr := range pdAddrsForEachTikv { + // length of addr is less than 5 means it must not be a valid address + if len(addr) < 5 { + continue + } + pdAddrs = append(pdAddrs, strings.Split(addr, ",")...) + } + return removeDuplicates(pdAddrs) +} + +func removeDuplicates(arr []string) []string { + uniqueMap := make(map[string]bool) + var result []string + + for _, item := range arr { + if _, exists := uniqueMap[item]; !exists { + uniqueMap[item] = true + result = append(result, item) + } + } + + return result +} diff --git a/tests/integrations/realcluster/deploy.sh b/tests/integrations/realcluster/deploy.sh index f6f567314f0..a7991d559b4 100755 --- a/tests/integrations/realcluster/deploy.sh +++ b/tests/integrations/realcluster/deploy.sh @@ -28,9 +28,9 @@ else # CI will download the binaries in the prepare phase. # ref https://github.com/PingCAP-QE/ci/blob/387e9e533b365174962ccb1959442a7070f9cd66/pipelines/tikv/pd/latest/pull_integration_realcluster_test.groovy#L55-L68 color-green "using existing binaries..." - $TIUP_BIN_DIR playground nightly --kv 3 --tiflash 1 --db 1 --pd 3 --without-monitor \ + $TIUP_BIN_DIR playground nightly --kv 3 --tiflash 1 --db 1 --pd 3 --without-monitor --tag pd_real_cluster_test \ --pd.binpath ./bin/pd-server --kv.binpath ./bin/tikv-server --db.binpath ./bin/tidb-server \ - --tiflash.binpath ./bin/tiflash --tag pd_real_cluster_test --pd.config ./tests/integrations/realcluster/pd.toml \ + --tiflash.binpath ./bin/tiflash --pd.config ./tests/integrations/realcluster/pd.toml \ > $CUR_PATH/playground.log 2>&1 & fi diff --git a/tests/integrations/realcluster/download_integration_test_binaries.sh b/tests/integrations/realcluster/download_integration_test_binaries.sh new file mode 100644 index 00000000000..8d4cc3411a8 --- /dev/null +++ b/tests/integrations/realcluster/download_integration_test_binaries.sh @@ -0,0 +1,80 @@ +#! /usr/bin/env bash + +# help +# download some third party tools for integration test +# example: ./download_integration_test_binaries.sh master + + +set -o errexit +set -o pipefail + + +# Specify which branch to be utilized for executing the test, which is +# exclusively accessible when obtaining binaries from +# http://fileserver.pingcap.net. +branch=${1:-master} +file_server_url=${2:-http://fileserver.pingcap.net} + +tidb_sha1_url="${file_server_url}/download/refs/pingcap/tidb/${branch}/sha1" +tikv_sha1_url="${file_server_url}/download/refs/pingcap/tikv/${branch}/sha1" +tiflash_sha1_url="${file_server_url}/download/refs/pingcap/tiflash/${branch}/sha1" + +tidb_sha1=$(curl "$tidb_sha1_url") +tikv_sha1=$(curl "$tikv_sha1_url") +tiflash_sha1=$(curl "$tiflash_sha1_url") + +# download tidb / tikv / tiflash binary build from tibuid multibranch pipeline +tidb_download_url="${file_server_url}/download/builds/pingcap/tidb/${tidb_sha1}/centos7/tidb-server.tar.gz" +tikv_download_url="${file_server_url}/download/builds/pingcap/tikv/${tikv_sha1}/centos7/tikv-server.tar.gz" +tiflash_download_url="${file_server_url}/download/builds/pingcap/tiflash/${branch}/${tiflash_sha1}/centos7/tiflash.tar.gz" + +set -o nounset + +# See https://misc.flogisoft.com/bash/tip_colors_and_formatting. +color_green() { # Green + echo -e "\x1B[1;32m${*}\x1B[0m" +} + +function download() { + local url=$1 + local file_name=$2 + local file_path=$3 + if [[ -f "${file_path}" ]]; then + echo "file ${file_name} already exists, skip download" + return + fi + echo "download ${file_name} from ${url}" + wget --no-verbose --retry-connrefused --waitretry=1 -t 3 -O "${file_path}" "${url}" +} + +function main() { + rm -rf third_bin + rm -rf tmp + mkdir third_bin + mkdir tmp + + # tidb server + download "$tidb_download_url" "tidb-server.tar.gz" "tmp/tidb-server.tar.gz" + tar -xzf tmp/tidb-server.tar.gz -C third_bin --wildcards 'bin/*' + mv third_bin/bin/* third_bin/ + + # TiKV server + download "$tikv_download_url" "tikv-server.tar.gz" "tmp/tikv-server.tar.gz" + tar -xzf tmp/tikv-server.tar.gz -C third_bin --wildcards 'bin/*' + mv third_bin/bin/* third_bin/ + + # TiFlash + download "$tiflash_download_url" "tiflash.tar.gz" "tmp/tiflash.tar.gz" + tar -xzf tmp/tiflash.tar.gz -C third_bin + mv third_bin/tiflash third_bin/_tiflash + mv third_bin/_tiflash/* third_bin && rm -rf third_bin/_tiflash + + chmod +x third_bin/* + rm -rf tmp + rm -rf third_bin/bin + ls -alh third_bin/ +} + +main "$@" + +color_green "Download SUCCESS" diff --git a/tests/integrations/realcluster/real_cluster.go b/tests/integrations/realcluster/real_cluster.go new file mode 100644 index 00000000000..99eca006954 --- /dev/null +++ b/tests/integrations/realcluster/real_cluster.go @@ -0,0 +1,169 @@ +// Copyright 2024 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package realcluster + +import ( + "fmt" + "os" + "os/exec" + "path/filepath" + "testing" + "time" + + "github.com/pingcap/log" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "go.uber.org/zap" +) + +type realClusterSuite struct { + suite.Suite + + clusterCnt int + suiteName string +} + +var tiupBin = os.Getenv("HOME") + "/.tiup/bin/tiup" + +// SetupSuite will run before the tests in the suite are run. +func (s *realClusterSuite) SetupSuite() { + t := s.T() + + // Clean the data dir. It is the default data dir of TiUP. + dataDir := filepath.Join(os.Getenv("HOME"), ".tiup", "data", "pd_real_cluster_test_"+s.suiteName+"_*") + matches, err := filepath.Glob(dataDir) + require.NoError(t, err) + + for _, match := range matches { + require.NoError(t, runCommand("rm", "-rf", match)) + } + s.startRealCluster(t) + t.Cleanup(func() { + s.stopRealCluster(t) + }) +} + +// TearDownSuite will run after all the tests in the suite have been run. +func (s *realClusterSuite) TearDownSuite() { + // Even if the cluster deployment fails, we still need to destroy the cluster. + // If the cluster does not fail to deploy, the cluster will be destroyed in + // the cleanup function. And these code will not work. + s.clusterCnt++ + s.stopRealCluster(s.T()) +} + +func (s *realClusterSuite) startRealCluster(t *testing.T) { + log.Info("start to deploy a real cluster") + + s.deploy(t) + s.clusterCnt++ +} + +func (s *realClusterSuite) stopRealCluster(t *testing.T) { + s.clusterCnt-- + + log.Info("start to destroy a real cluster", zap.String("tag", s.tag())) + destroy(t, s.tag()) + time.Sleep(5 * time.Second) +} + +func (s *realClusterSuite) tag() string { + return fmt.Sprintf("pd_real_cluster_test_%s_%d", s.suiteName, s.clusterCnt) +} + +// func restartTiUP() { +// log.Info("start to restart TiUP") +// cmd := exec.Command("make", "deploy") +// cmd.Stdout = os.Stdout +// cmd.Stderr = os.Stderr +// err := cmd.Run() +// if err != nil { +// panic(err) +// } +// log.Info("TiUP restart success") +// } + +func (s *realClusterSuite) deploy(t *testing.T) { + tag := s.tag() + deployTiupPlayground(t, tag) + waitTiupReady(t, tag) +} + +func destroy(t *testing.T, tag string) { + cmdStr := fmt.Sprintf("ps -ef | grep 'tiup playground' | grep %s | awk '{print $2}' | head -n 1", tag) + cmd := exec.Command("sh", "-c", cmdStr) + bytes, err := cmd.Output() + require.NoError(t, err) + pid := string(bytes) + // nolint:errcheck + runCommand("sh", "-c", "kill -9 "+pid) + log.Info("destroy success", zap.String("pid", pid)) +} + +func deployTiupPlayground(t *testing.T, tag string) { + curPath, err := os.Getwd() + require.NoError(t, err) + + log.Info(curPath) + require.NoError(t, os.Chdir("../../..")) + + if !fileExists("third_bin") || !fileExists("third_bin/tikv-server") || !fileExists("third_bin/tidb-server") || !fileExists("third_bin/tiflash") { + log.Info("downloading binaries...") + log.Info("this may take a few minutes, you can also download them manually and put them in the bin directory.") + require.NoError(t, runCommand("sh", + "./tests/integrations/realcluster/download_integration_test_binaries.sh")) + } + if !fileExists("bin") || !fileExists("bin/pd-server") { + log.Info("complie pd binaries...") + require.NoError(t, runCommand("make", "pd-server")) + } + if !fileExists(filepath.Join(curPath, "playground")) { + require.NoError(t, os.Mkdir(filepath.Join(curPath, "playground"), 0755)) + } + // nolint:errcheck + go runCommand("sh", "-c", + tiupBin+` playground nightly --kv 3 --tiflash 1 --db 1 --pd 3 \ + --without-monitor --tag `+tag+` --pd.binpath ./bin/pd-server \ + // --kv.binpath ./third_bin/tikv-server \ + // --db.binpath ./third_bin/tidb-server --tiflash.binpath ./third_bin/tiflash \ + --kv.binpath ./bin/tikv-server \ + --db.binpath ./bin/tidb-server --tiflash.binpath ./bin/tiflash \ + --pd.config ./tests/integrations/realcluster/pd.toml \ + > `+filepath.Join(curPath, "playground", tag+".log")+` 2>&1 & `) + + // Avoid to change the dir before execute `tiup playground`. + time.Sleep(10 * time.Second) + require.NoError(t, os.Chdir(curPath)) +} + +func waitTiupReady(t *testing.T, tag string) { + const ( + interval = 5 + maxTimes = 20 + ) + log.Info("start to wait TiUP ready", zap.String("tag", tag)) + for i := 0; i < maxTimes; i++ { + err := runCommand(tiupBin, "playground", "display", "--tag", tag) + if err == nil { + log.Info("TiUP is ready", zap.String("tag", tag)) + return + } + + log.Info("TiUP is not ready, will retry", zap.Int("retry times", i), + zap.String("tag", tag), zap.Error(err)) + time.Sleep(time.Duration(interval) * time.Second) + } + require.Failf(t, "TiUP is not ready", "tag: %s", tag) +} diff --git a/tests/integrations/realcluster/reboot_pd_test.go b/tests/integrations/realcluster/reboot_pd_test.go index 9f2b286e9b1..50b4bee2055 100644 --- a/tests/integrations/realcluster/reboot_pd_test.go +++ b/tests/integrations/realcluster/reboot_pd_test.go @@ -14,67 +14,45 @@ package realcluster -import ( - "context" - "os" - "os/exec" - "testing" - - "github.com/pingcap/log" - "github.com/stretchr/testify/require" -) - -func restartTiUP() { - log.Info("start to restart TiUP") - cmd := exec.Command("make", "deploy") - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - err := cmd.Run() - if err != nil { - panic(err) - } - log.Info("TiUP restart success") -} - // https://github.com/tikv/pd/issues/6467 -func TestReloadLabel(t *testing.T) { - re := require.New(t) - ctx := context.Background() - - resp, err := pdHTTPCli.GetStores(ctx) - re.NoError(err) - re.NotEmpty(resp.Stores) - firstStore := resp.Stores[0] - // TiFlash labels will be ["engine": "tiflash"] - // So we need to merge the labels - storeLabels := map[string]string{ - "zone": "zone1", - } - for _, label := range firstStore.Store.Labels { - storeLabels[label.Key] = label.Value - } - re.NoError(pdHTTPCli.SetStoreLabels(ctx, firstStore.Store.ID, storeLabels)) - defer func() { - re.NoError(pdHTTPCli.DeleteStoreLabel(ctx, firstStore.Store.ID, "zone")) - }() - - checkLabelsAreEqual := func() { - resp, err := pdHTTPCli.GetStore(ctx, uint64(firstStore.Store.ID)) - re.NoError(err) - - labelsMap := make(map[string]string) - for _, label := range resp.Store.Labels { - re.NotNil(label) - labelsMap[label.Key] = label.Value - } - - for key, value := range storeLabels { - re.Equal(value, labelsMap[key]) - } - } - // Check the label is set - checkLabelsAreEqual() - // Restart TiUP to reload the label - restartTiUP() - checkLabelsAreEqual() -} +// func TestReloadLabel(t *testing.T) { +// re := require.New(t) +// ctx := context.Background() + +// resp, err := pdHTTPCli.GetStores(ctx) +// re.NoError(err) +// re.NotEmpty(resp.Stores) +// firstStore := resp.Stores[0] +// // TiFlash labels will be ["engine": "tiflash"] +// // So we need to merge the labels +// storeLabels := map[string]string{ +// "zone": "zone1", +// } +// for _, label := range firstStore.Store.Labels { +// storeLabels[label.Key] = label.Value +// } +// re.NoError(pdHTTPCli.SetStoreLabels(ctx, firstStore.Store.ID, storeLabels)) +// defer func() { +// re.NoError(pdHTTPCli.DeleteStoreLabel(ctx, firstStore.Store.ID, "zone")) +// }() + +// checkLabelsAreEqual := func() { +// resp, err := pdHTTPCli.GetStore(ctx, uint64(firstStore.Store.ID)) +// re.NoError(err) + +// labelsMap := make(map[string]string) +// for _, label := range resp.Store.Labels { +// re.NotNil(label) +// labelsMap[label.Key] = label.Value +// } + +// for key, value := range storeLabels { +// re.Equal(value, labelsMap[key]) +// } +// } +// // Check the label is set +// checkLabelsAreEqual() +// // Restart TiUP to reload the label +// restartTiUP() +// checkLabelsAreEqual() +// } diff --git a/tests/integrations/realcluster/scheduler_test.go b/tests/integrations/realcluster/scheduler_test.go index 98a18158114..7e5087627fb 100644 --- a/tests/integrations/realcluster/scheduler_test.go +++ b/tests/integrations/realcluster/scheduler_test.go @@ -14,175 +14,161 @@ package realcluster -import ( - "context" - "fmt" - "sort" - "testing" - "time" - - "github.com/stretchr/testify/require" - pd "github.com/tikv/pd/client/http" - "github.com/tikv/pd/client/testutil" - "github.com/tikv/pd/pkg/schedule/labeler" - "github.com/tikv/pd/pkg/schedule/types" -) - // https://github.com/tikv/pd/issues/6988#issuecomment-1694924611 // https://github.com/tikv/pd/issues/6897 -func TestTransferLeader(t *testing.T) { - re := require.New(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - resp, err := pdHTTPCli.GetLeader(ctx) - re.NoError(err) - oldLeader := resp.Name - - var newLeader string - for i := 0; i < 2; i++ { - if resp.Name != fmt.Sprintf("pd-%d", i) { - newLeader = fmt.Sprintf("pd-%d", i) - } - } - - // record scheduler - re.NoError(pdHTTPCli.CreateScheduler(ctx, types.EvictLeaderScheduler.String(), 1)) - defer func() { - re.NoError(pdHTTPCli.DeleteScheduler(ctx, types.EvictLeaderScheduler.String())) - }() - res, err := pdHTTPCli.GetSchedulers(ctx) - re.NoError(err) - oldSchedulersLen := len(res) - - re.NoError(pdHTTPCli.TransferLeader(ctx, newLeader)) - // wait for transfer leader to new leader - time.Sleep(1 * time.Second) - resp, err = pdHTTPCli.GetLeader(ctx) - re.NoError(err) - re.Equal(newLeader, resp.Name) - - res, err = pdHTTPCli.GetSchedulers(ctx) - re.NoError(err) - re.Len(res, oldSchedulersLen) - - // transfer leader to old leader - re.NoError(pdHTTPCli.TransferLeader(ctx, oldLeader)) - // wait for transfer leader - time.Sleep(1 * time.Second) - resp, err = pdHTTPCli.GetLeader(ctx) - re.NoError(err) - re.Equal(oldLeader, resp.Name) - - res, err = pdHTTPCli.GetSchedulers(ctx) - re.NoError(err) - re.Len(res, oldSchedulersLen) -} - -func TestRegionLabelDenyScheduler(t *testing.T) { - re := require.New(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - regions, err := pdHTTPCli.GetRegions(ctx) - re.NoError(err) - re.NotEmpty(regions.Regions) - region1 := regions.Regions[0] - - err = pdHTTPCli.DeleteScheduler(ctx, types.BalanceLeaderScheduler.String()) - if err == nil { - defer func() { - pdHTTPCli.CreateScheduler(ctx, types.BalanceLeaderScheduler.String(), 0) - }() - } - - re.NoError(pdHTTPCli.CreateScheduler(ctx, types.GrantLeaderScheduler.String(), uint64(region1.Leader.StoreID))) - defer func() { - pdHTTPCli.DeleteScheduler(ctx, types.GrantLeaderScheduler.String()) - }() - - // wait leader transfer - testutil.Eventually(re, func() bool { - regions, err := pdHTTPCli.GetRegions(ctx) - re.NoError(err) - for _, region := range regions.Regions { - if region.Leader.StoreID != region1.Leader.StoreID { - return false - } - } - return true - }, testutil.WithWaitFor(time.Minute)) - - // disable schedule for region1 - labelRule := &pd.LabelRule{ - ID: "rule1", - Labels: []pd.RegionLabel{{Key: "schedule", Value: "deny"}}, - RuleType: "key-range", - Data: labeler.MakeKeyRanges(region1.StartKey, region1.EndKey), - } - re.NoError(pdHTTPCli.SetRegionLabelRule(ctx, labelRule)) - defer func() { - pdHTTPCli.PatchRegionLabelRules(ctx, &pd.LabelRulePatch{DeleteRules: []string{labelRule.ID}}) - }() - labelRules, err := pdHTTPCli.GetAllRegionLabelRules(ctx) - re.NoError(err) - re.Len(labelRules, 2) - sort.Slice(labelRules, func(i, j int) bool { - return labelRules[i].ID < labelRules[j].ID - }) - re.Equal(labelRule.ID, labelRules[1].ID) - re.Equal(labelRule.Labels, labelRules[1].Labels) - re.Equal(labelRule.RuleType, labelRules[1].RuleType) - - // enable evict leader scheduler, and check it works - re.NoError(pdHTTPCli.DeleteScheduler(ctx, types.GrantLeaderScheduler.String())) - re.NoError(pdHTTPCli.CreateScheduler(ctx, types.EvictLeaderScheduler.String(), uint64(region1.Leader.StoreID))) - defer func() { - pdHTTPCli.DeleteScheduler(ctx, types.EvictLeaderScheduler.String()) - }() - testutil.Eventually(re, func() bool { - regions, err := pdHTTPCli.GetRegions(ctx) - re.NoError(err) - for _, region := range regions.Regions { - if region.Leader.StoreID == region1.Leader.StoreID { - return false - } - } - return true - }, testutil.WithWaitFor(time.Minute)) - - re.NoError(pdHTTPCli.DeleteScheduler(ctx, types.EvictLeaderScheduler.String())) - re.NoError(pdHTTPCli.CreateScheduler(ctx, types.GrantLeaderScheduler.String(), uint64(region1.Leader.StoreID))) - defer func() { - pdHTTPCli.DeleteScheduler(ctx, types.GrantLeaderScheduler.String()) - }() - testutil.Eventually(re, func() bool { - regions, err := pdHTTPCli.GetRegions(ctx) - re.NoError(err) - for _, region := range regions.Regions { - if region.ID == region1.ID { - continue - } - if region.Leader.StoreID != region1.Leader.StoreID { - return false - } - } - return true - }, testutil.WithWaitFor(time.Minute)) - - pdHTTPCli.PatchRegionLabelRules(ctx, &pd.LabelRulePatch{DeleteRules: []string{labelRule.ID}}) - labelRules, err = pdHTTPCli.GetAllRegionLabelRules(ctx) - re.NoError(err) - re.Len(labelRules, 1) - - testutil.Eventually(re, func() bool { - regions, err := pdHTTPCli.GetRegions(ctx) - re.NoError(err) - for _, region := range regions.Regions { - if region.Leader.StoreID != region1.Leader.StoreID { - return false - } - } - return true - }, testutil.WithWaitFor(time.Minute)) -} +// func TestTransferLeader(t *testing.T) { +// re := require.New(t) +// ctx, cancel := context.WithCancel(context.Background()) +// defer cancel() + +// resp, err := pdHTTPCli.GetLeader(ctx) +// re.NoError(err) +// oldLeader := resp.Name + +// var newLeader string +// for i := 0; i < 2; i++ { +// if resp.Name != fmt.Sprintf("pd-%d", i) { +// newLeader = fmt.Sprintf("pd-%d", i) +// } +// } + +// // record scheduler +// re.NoError(pdHTTPCli.CreateScheduler(ctx, types.EvictLeaderScheduler.String(), 1)) +// defer func() { +// re.NoError(pdHTTPCli.DeleteScheduler(ctx, types.EvictLeaderScheduler.String())) +// }() +// res, err := pdHTTPCli.GetSchedulers(ctx) +// re.NoError(err) +// oldSchedulersLen := len(res) + +// re.NoError(pdHTTPCli.TransferLeader(ctx, newLeader)) +// // wait for transfer leader to new leader +// time.Sleep(1 * time.Second) +// resp, err = pdHTTPCli.GetLeader(ctx) +// re.NoError(err) +// re.Equal(newLeader, resp.Name) + +// res, err = pdHTTPCli.GetSchedulers(ctx) +// re.NoError(err) +// re.Len(res, oldSchedulersLen) + +// // transfer leader to old leader +// re.NoError(pdHTTPCli.TransferLeader(ctx, oldLeader)) +// // wait for transfer leader +// time.Sleep(1 * time.Second) +// resp, err = pdHTTPCli.GetLeader(ctx) +// re.NoError(err) +// re.Equal(oldLeader, resp.Name) + +// res, err = pdHTTPCli.GetSchedulers(ctx) +// re.NoError(err) +// re.Len(res, oldSchedulersLen) +// } + +// func TestRegionLabelDenyScheduler(t *testing.T) { +// re := require.New(t) +// ctx, cancel := context.WithCancel(context.Background()) +// defer cancel() + +// regions, err := pdHTTPCli.GetRegions(ctx) +// re.NoError(err) +// re.NotEmpty(regions.Regions) +// region1 := regions.Regions[0] + +// err = pdHTTPCli.DeleteScheduler(ctx, types.BalanceLeaderScheduler.String()) +// if err == nil { +// defer func() { +// pdHTTPCli.CreateScheduler(ctx, types.BalanceLeaderScheduler.String(), 0) +// }() +// } + +// re.NoError(pdHTTPCli.CreateScheduler(ctx, types.GrantLeaderScheduler.String(), uint64(region1.Leader.StoreID))) +// defer func() { +// pdHTTPCli.DeleteScheduler(ctx, types.GrantLeaderScheduler.String()) +// }() + +// // wait leader transfer +// testutil.Eventually(re, func() bool { +// regions, err := pdHTTPCli.GetRegions(ctx) +// re.NoError(err) +// for _, region := range regions.Regions { +// if region.Leader.StoreID != region1.Leader.StoreID { +// return false +// } +// } +// return true +// }, testutil.WithWaitFor(time.Minute)) + +// // disable schedule for region1 +// labelRule := &pd.LabelRule{ +// ID: "rule1", +// Labels: []pd.RegionLabel{{Key: "schedule", Value: "deny"}}, +// RuleType: "key-range", +// Data: labeler.MakeKeyRanges(region1.StartKey, region1.EndKey), +// } +// re.NoError(pdHTTPCli.SetRegionLabelRule(ctx, labelRule)) +// defer func() { +// pdHTTPCli.PatchRegionLabelRules(ctx, &pd.LabelRulePatch{DeleteRules: []string{labelRule.ID}}) +// }() +// labelRules, err := pdHTTPCli.GetAllRegionLabelRules(ctx) +// re.NoError(err) +// re.Len(labelRules, 2) +// sort.Slice(labelRules, func(i, j int) bool { +// return labelRules[i].ID < labelRules[j].ID +// }) +// re.Equal(labelRule.ID, labelRules[1].ID) +// re.Equal(labelRule.Labels, labelRules[1].Labels) +// re.Equal(labelRule.RuleType, labelRules[1].RuleType) + +// // enable evict leader scheduler, and check it works +// re.NoError(pdHTTPCli.DeleteScheduler(ctx, types.GrantLeaderScheduler.String())) +// re.NoError(pdHTTPCli.CreateScheduler(ctx, types.EvictLeaderScheduler.String(), uint64(region1.Leader.StoreID))) +// defer func() { +// pdHTTPCli.DeleteScheduler(ctx, types.EvictLeaderScheduler.String()) +// }() +// testutil.Eventually(re, func() bool { +// regions, err := pdHTTPCli.GetRegions(ctx) +// re.NoError(err) +// for _, region := range regions.Regions { +// if region.Leader.StoreID == region1.Leader.StoreID { +// return false +// } +// } +// return true +// }, testutil.WithWaitFor(time.Minute)) + +// re.NoError(pdHTTPCli.DeleteScheduler(ctx, types.EvictLeaderScheduler.String())) +// re.NoError(pdHTTPCli.CreateScheduler(ctx, types.GrantLeaderScheduler.String(), uint64(region1.Leader.StoreID))) +// defer func() { +// pdHTTPCli.DeleteScheduler(ctx, types.GrantLeaderScheduler.String()) +// }() +// testutil.Eventually(re, func() bool { +// regions, err := pdHTTPCli.GetRegions(ctx) +// re.NoError(err) +// for _, region := range regions.Regions { +// if region.ID == region1.ID { +// continue +// } +// if region.Leader.StoreID != region1.Leader.StoreID { +// return false +// } +// } +// return true +// }, testutil.WithWaitFor(time.Minute)) + +// pdHTTPCli.PatchRegionLabelRules(ctx, &pd.LabelRulePatch{DeleteRules: []string{labelRule.ID}}) +// labelRules, err = pdHTTPCli.GetAllRegionLabelRules(ctx) +// re.NoError(err) +// re.Len(labelRules, 1) + +// testutil.Eventually(re, func() bool { +// regions, err := pdHTTPCli.GetRegions(ctx) +// re.NoError(err) +// for _, region := range regions.Regions { +// if region.Leader.StoreID != region1.Leader.StoreID { +// return false +// } +// } +// return true +// }, testutil.WithWaitFor(time.Minute)) +// } diff --git a/tests/integrations/realcluster/ts_test.go b/tests/integrations/realcluster/ts_test.go index 5d970556fbc..156e3d63e71 100644 --- a/tests/integrations/realcluster/ts_test.go +++ b/tests/integrations/realcluster/ts_test.go @@ -14,32 +14,26 @@ package realcluster -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func TestTS(t *testing.T) { - re := require.New(t) - - db := OpenTestDB(t) - db.MustExec("use test") - db.MustExec("drop table if exists t") - db.MustExec("create table t(a int, index i(a))") - db.MustExec("insert t values (1), (2), (3)") - var rows int - err := db.inner.Raw("select count(*) from t").Row().Scan(&rows) - re.NoError(err) - re.Equal(3, rows) - - re.NoError(err) - re.Equal(3, rows) - - var ts uint64 - err = db.inner.Begin().Raw("select @@tidb_current_ts").Scan(&ts).Rollback().Error - re.NoError(err) - re.NotEqual(0, GetTimeFromTS(ts)) - - db.MustClose() -} +// func TestTS(t *testing.T) { +// re := require.New(t) + +// db := OpenTestDB(t) +// db.MustExec("use test") +// db.MustExec("drop table if exists t") +// db.MustExec("create table t(a int, index i(a))") +// db.MustExec("insert t values (1), (2), (3)") +// var rows int +// err := db.inner.Raw("select count(*) from t").Row().Scan(&rows) +// re.NoError(err) +// re.Equal(3, rows) + +// re.NoError(err) +// re.Equal(3, rows) + +// var ts uint64 +// err = db.inner.Begin().Raw("select @@tidb_current_ts").Scan(&ts).Rollback().Error +// re.NoError(err) +// re.NotEqual(0, GetTimeFromTS(ts)) + +// db.MustClose() +// } diff --git a/tests/integrations/realcluster/util.go b/tests/integrations/realcluster/util.go index f6c8295b6ef..013c41da7f3 100644 --- a/tests/integrations/realcluster/util.go +++ b/tests/integrations/realcluster/util.go @@ -15,16 +15,16 @@ package realcluster import ( + "os" + "os/exec" "time" - - "github.com/tikv/pd/client/http" ) const physicalShiftBits = 18 var ( - pdAddrs = []string{"http://127.0.0.1:2379"} - pdHTTPCli = http.NewClient("pd-real-cluster-test", pdAddrs) +// pdAddrs = []string{"http://127.0.0.1:2379"} +// pdHTTPCli = http.NewClient("pd-real-cluster-test", pdAddrs) ) // GetTimeFromTS extracts time.Time from a timestamp. @@ -37,3 +37,15 @@ func GetTimeFromTS(ts uint64) time.Time { func ExtractPhysical(ts uint64) int64 { return int64(ts >> physicalShiftBits) } + +func runCommand(name string, args ...string) error { + cmd := exec.Command(name, args...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + return cmd.Run() +} + +func fileExists(path string) bool { + _, err := os.Stat(path) + return !os.IsNotExist(err) +} From 003def6c741dcf8d785d35333af95e4f7fae290b Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Mon, 21 Oct 2024 17:22:47 +0800 Subject: [PATCH 11/15] realclustertest: update binary dir (#8731) ref tikv/pd#8683 Signed-off-by: okJiang <819421878@qq.com> --- .../integrations/realcluster/real_cluster.go | 34 ++++++++++++------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/tests/integrations/realcluster/real_cluster.go b/tests/integrations/realcluster/real_cluster.go index 99eca006954..21284f285b0 100644 --- a/tests/integrations/realcluster/real_cluster.go +++ b/tests/integrations/realcluster/real_cluster.go @@ -35,7 +35,18 @@ type realClusterSuite struct { suiteName string } -var tiupBin = os.Getenv("HOME") + "/.tiup/bin/tiup" +var ( + playgroundLogDir = filepath.Join("tmp", "real_cluster", "playground") + tiupBin string +) + +func init() { + var err error + tiupBin, err = exec.LookPath("tiup") + if err != nil { + panic(err) + } +} // SetupSuite will run before the tests in the suite are run. func (s *realClusterSuite) SetupSuite() { @@ -115,8 +126,6 @@ func destroy(t *testing.T, tag string) { func deployTiupPlayground(t *testing.T, tag string) { curPath, err := os.Getwd() require.NoError(t, err) - - log.Info(curPath) require.NoError(t, os.Chdir("../../..")) if !fileExists("third_bin") || !fileExists("third_bin/tikv-server") || !fileExists("third_bin/tidb-server") || !fileExists("third_bin/tiflash") { @@ -129,19 +138,20 @@ func deployTiupPlayground(t *testing.T, tag string) { log.Info("complie pd binaries...") require.NoError(t, runCommand("make", "pd-server")) } - if !fileExists(filepath.Join(curPath, "playground")) { - require.NoError(t, os.Mkdir(filepath.Join(curPath, "playground"), 0755)) + + if !fileExists(playgroundLogDir) { + require.NoError(t, os.MkdirAll(playgroundLogDir, 0755)) } // nolint:errcheck - go runCommand("sh", "-c", - tiupBin+` playground nightly --kv 3 --tiflash 1 --db 1 --pd 3 \ + go func() { + runCommand("sh", "-c", + tiupBin+` playground nightly --kv 3 --tiflash 1 --db 1 --pd 3 \ --without-monitor --tag `+tag+` --pd.binpath ./bin/pd-server \ - // --kv.binpath ./third_bin/tikv-server \ - // --db.binpath ./third_bin/tidb-server --tiflash.binpath ./third_bin/tiflash \ - --kv.binpath ./bin/tikv-server \ - --db.binpath ./bin/tidb-server --tiflash.binpath ./bin/tiflash \ + --kv.binpath ./third_bin/tikv-server \ + --db.binpath ./third_bin/tidb-server --tiflash.binpath ./third_bin/tiflash \ --pd.config ./tests/integrations/realcluster/pd.toml \ - > `+filepath.Join(curPath, "playground", tag+".log")+` 2>&1 & `) + > `+filepath.Join(playgroundLogDir, tag+".log")+` 2>&1 & `) + }() // Avoid to change the dir before execute `tiup playground`. time.Sleep(10 * time.Second) From 25629f2dbd1d293b1a99dde0272621a153ca346a Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Tue, 22 Oct 2024 11:59:36 +0800 Subject: [PATCH 12/15] ms/tso: move `startGlobalAllocatorLoop` outside `NewAllocatorManager` (#8725) ref tikv/pd#4399 Signed-off-by: okJiang <819421878@qq.com> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/tso/allocator_manager.go | 18 ++++++++++++++---- pkg/tso/global_allocator.go | 8 ++------ pkg/tso/keyspace_group_manager.go | 3 ++- server/server.go | 2 +- 4 files changed, 19 insertions(+), 12 deletions(-) diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index 56ee8313d57..a02d4884e17 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -202,7 +202,6 @@ func NewAllocatorManager( rootPath string, storage endpoint.TSOStorage, cfg Config, - startGlobalLeaderLoop bool, ) *AllocatorManager { ctx, cancel := context.WithCancel(ctx) am := &AllocatorManager{ @@ -224,7 +223,7 @@ func NewAllocatorManager( am.localAllocatorConn.clientConns = make(map[string]*grpc.ClientConn) // Set up the Global TSO Allocator here, it will be initialized once the member campaigns leader successfully. - am.SetUpGlobalAllocator(am.ctx, am.member.GetLeadership(), startGlobalLeaderLoop) + am.SetUpGlobalAllocator(am.ctx, am.member.GetLeadership()) am.svcLoopWG.Add(1) go am.tsoAllocatorLoop() @@ -234,11 +233,11 @@ func NewAllocatorManager( // SetUpGlobalAllocator is used to set up the global allocator, which will initialize the allocator and put it into // an allocator daemon. An TSO Allocator should only be set once, and may be initialized and reset multiple times // depending on the election. -func (am *AllocatorManager) SetUpGlobalAllocator(ctx context.Context, leadership *election.Leadership, startGlobalLeaderLoop bool) { +func (am *AllocatorManager) SetUpGlobalAllocator(ctx context.Context, leadership *election.Leadership) { am.mu.Lock() defer am.mu.Unlock() - allocator := NewGlobalTSOAllocator(ctx, am, startGlobalLeaderLoop) + allocator := NewGlobalTSOAllocator(ctx, am) // Create a new allocatorGroup ctx, cancel := context.WithCancel(ctx) am.mu.allocatorGroups[GlobalDCLocation] = &allocatorGroup{ @@ -1389,3 +1388,14 @@ func (am *AllocatorManager) GetLeaderAddr() string { } return leaderAddrs[0] } + +func (am *AllocatorManager) startGlobalAllocatorLoop() { + globalTSOAllocator, ok := am.mu.allocatorGroups[GlobalDCLocation].allocator.(*GlobalTSOAllocator) + if !ok { + // it should never happen + log.Error("failed to start global allocator loop, global allocator not found") + return + } + globalTSOAllocator.wg.Add(1) + go globalTSOAllocator.primaryElectionLoop() +} diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index 53a6f65a25d..5c7c905089c 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -97,7 +97,6 @@ type GlobalTSOAllocator struct { func NewGlobalTSOAllocator( ctx context.Context, am *AllocatorManager, - startGlobalLeaderLoop bool, ) Allocator { ctx, cancel := context.WithCancel(ctx) gta := &GlobalTSOAllocator{ @@ -109,11 +108,6 @@ func NewGlobalTSOAllocator( tsoAllocatorRoleGauge: tsoAllocatorRole.WithLabelValues(am.getGroupIDStr(), GlobalDCLocation), } - if startGlobalLeaderLoop { - gta.wg.Add(1) - go gta.primaryElectionLoop() - } - return gta } @@ -537,6 +531,8 @@ func (gta *GlobalTSOAllocator) Reset() { gta.timestampOracle.ResetTimestamp() } +// primaryElectionLoop is used to maintain the TSO primary election and TSO's +// running allocator. It is only used in API mode. func (gta *GlobalTSOAllocator) primaryElectionLoop() { defer logutil.LogPanic() defer gta.wg.Done() diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 7c1d3426ce9..c19d790efc5 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -790,7 +790,8 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro storage = kgm.tsoSvcStorage } // Initialize all kinds of maps. - am := NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg, true) + am := NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg) + am.startGlobalAllocatorLoop() log.Info("created allocator manager", zap.Uint32("keyspace-group-id", group.ID), zap.String("timestamp-path", am.GetTimestampPath(""))) diff --git a/server/server.go b/server/server.go index c79f51d8153..9691633bae2 100644 --- a/server/server.go +++ b/server/server.go @@ -475,7 +475,7 @@ func (s *Server) startServer(ctx context.Context) error { s.tsoDispatcher = tsoutil.NewTSODispatcher(tsoProxyHandleDuration, tsoProxyBatchSize) s.tsoProtoFactory = &tsoutil.TSOProtoFactory{} s.pdProtoFactory = &tsoutil.PDProtoFactory{} - s.tsoAllocatorManager = tso.NewAllocatorManager(s.ctx, constant.DefaultKeyspaceGroupID, s.member, s.rootPath, s.storage, s, false) + s.tsoAllocatorManager = tso.NewAllocatorManager(s.ctx, constant.DefaultKeyspaceGroupID, s.member, s.rootPath, s.storage, s) // When disabled the Local TSO, we should clean up the Local TSO Allocator's meta info written in etcd if it exists. if !s.cfg.EnableLocalTSO { if err = s.tsoAllocatorManager.CleanUpDCLocation(); err != nil { From 0402e15a5f2c727f99c7923cb1db944612c39c28 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 22 Oct 2024 14:58:56 +0800 Subject: [PATCH 13/15] *: move tso to independent thread (#8720) ref tikv/pd#8477 Signed-off-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- client/client.go | 4 +- client/pd_service_discovery.go | 3 + client/tso_service_discovery.go | 3 + server/cluster/cluster.go | 94 ++++++++++++++++++---- server/server.go | 26 +----- tests/integrations/tso/client_test.go | 4 +- tests/integrations/tso/consistency_test.go | 1 + tests/integrations/tso/server_test.go | 1 + tests/server/cluster/cluster_test.go | 6 +- tests/server/tso/allocator_test.go | 3 + tests/server/tso/global_tso_test.go | 3 + tests/server/tso/tso_test.go | 4 +- tools/pd-backup/tests/backup_test.go | 2 + 13 files changed, 105 insertions(+), 49 deletions(-) diff --git a/client/client.go b/client/client.go index f8c8d32cee8..9ced7284153 100644 --- a/client/client.go +++ b/client/client.go @@ -206,9 +206,7 @@ func (k *serviceModeKeeper) close() { k.tsoSvcDiscovery.Close() fallthrough case pdpb.ServiceMode_PD_SVC_MODE: - if k.tsoClient != nil { - k.tsoClient.close() - } + k.tsoClient.close() case pdpb.ServiceMode_UNKNOWN_SVC_MODE: } } diff --git a/client/pd_service_discovery.go b/client/pd_service_discovery.go index f42ae7fea4a..83bc8e612a3 100644 --- a/client/pd_service_discovery.go +++ b/client/pd_service_discovery.go @@ -634,6 +634,9 @@ func (c *pdServiceDiscovery) checkFollowerHealth(ctx context.Context) { // Close releases all resources. func (c *pdServiceDiscovery) Close() { + if c == nil { + return + } c.closeOnce.Do(func() { log.Info("[pd] close pd service discovery client") c.clientConns.Range(func(key, cc any) bool { diff --git a/client/tso_service_discovery.go b/client/tso_service_discovery.go index 617b709ca76..0380ddb4c28 100644 --- a/client/tso_service_discovery.go +++ b/client/tso_service_discovery.go @@ -226,6 +226,9 @@ func (c *tsoServiceDiscovery) retry( // Close releases all resources func (c *tsoServiceDiscovery) Close() { + if c == nil { + return + } log.Info("closing tso service discovery") c.cancel() diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 4cce39fa093..3869308d9dc 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -17,6 +17,7 @@ package cluster import ( "context" "encoding/json" + errorspkg "errors" "fmt" "io" "math" @@ -43,6 +44,7 @@ import ( "github.com/tikv/pd/pkg/keyspace" "github.com/tikv/pd/pkg/mcs/discovery" "github.com/tikv/pd/pkg/mcs/utils/constant" + "github.com/tikv/pd/pkg/member" "github.com/tikv/pd/pkg/memory" "github.com/tikv/pd/pkg/progress" "github.com/tikv/pd/pkg/ratelimit" @@ -56,6 +58,7 @@ import ( "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/syncer" + "github.com/tikv/pd/pkg/tso" "github.com/tikv/pd/pkg/unsaferecovery" "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/keypath" @@ -88,12 +91,13 @@ const ( // nodeStateCheckJobInterval is the interval to run node state check job. nodeStateCheckJobInterval = 10 * time.Second // metricsCollectionJobInterval is the interval to run metrics collection job. - metricsCollectionJobInterval = 10 * time.Second - updateStoreStatsInterval = 9 * time.Millisecond - clientTimeout = 3 * time.Second - defaultChangedRegionsLimit = 10000 - gcTombstoneInterval = 30 * 24 * time.Hour - serviceCheckInterval = 10 * time.Second + metricsCollectionJobInterval = 10 * time.Second + updateStoreStatsInterval = 9 * time.Millisecond + clientTimeout = 3 * time.Second + defaultChangedRegionsLimit = 10000 + gcTombstoneInterval = 30 * 24 * time.Hour + schedulingServiceCheckInterval = 10 * time.Second + tsoServiceCheckInterval = 100 * time.Millisecond // persistLimitRetryTimes is used to reduce the probability of the persistent error // since the once the store is added or removed, we shouldn't return an error even if the store limit is failed to persist. persistLimitRetryTimes = 5 @@ -144,6 +148,7 @@ type RaftCluster struct { cancel context.CancelFunc *core.BasicCluster // cached cluster info + member *member.EmbeddedEtcdMember etcdClient *clientv3.Client httpClient *http.Client @@ -174,6 +179,7 @@ type RaftCluster struct { keyspaceGroupManager *keyspace.GroupManager independentServices sync.Map hbstreams *hbstream.HeartbeatStreams + tsoAllocator *tso.AllocatorManager // heartbeatRunner is used to process the subtree update task asynchronously. heartbeatRunner ratelimit.Runner @@ -194,16 +200,18 @@ type Status struct { } // NewRaftCluster create a new cluster. -func NewRaftCluster(ctx context.Context, clusterID uint64, basicCluster *core.BasicCluster, storage storage.Storage, regionSyncer *syncer.RegionSyncer, etcdClient *clientv3.Client, - httpClient *http.Client) *RaftCluster { +func NewRaftCluster(ctx context.Context, clusterID uint64, member *member.EmbeddedEtcdMember, basicCluster *core.BasicCluster, storage storage.Storage, regionSyncer *syncer.RegionSyncer, etcdClient *clientv3.Client, + httpClient *http.Client, tsoAllocator *tso.AllocatorManager) *RaftCluster { return &RaftCluster{ serverCtx: ctx, clusterID: clusterID, + member: member, regionSyncer: regionSyncer, httpClient: httpClient, etcdClient: etcdClient, BasicCluster: basicCluster, storage: storage, + tsoAllocator: tsoAllocator, heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), miscRunner: ratelimit.NewConcurrentRunner(miscTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), @@ -314,11 +322,13 @@ func (c *RaftCluster) Start(s Server) error { if err != nil { return err } + c.checkTSOService() cluster, err := c.LoadClusterInfo() if err != nil { return err } if cluster == nil { + log.Warn("cluster is not bootstrapped") return nil } @@ -351,7 +361,7 @@ func (c *RaftCluster) Start(s Server) error { return err } } - c.checkServices() + c.checkSchedulingService() c.wg.Add(9) go c.runServiceCheckJob() go c.runMetricsCollectionJob() @@ -370,7 +380,7 @@ func (c *RaftCluster) Start(s Server) error { return nil } -func (c *RaftCluster) checkServices() { +func (c *RaftCluster) checkSchedulingService() { if c.isAPIServiceMode { servers, err := discovery.Discover(c.etcdClient, strconv.FormatUint(c.clusterID, 10), constant.SchedulingServiceName) if c.opt.GetMicroServiceConfig().IsSchedulingFallbackEnabled() && (err != nil || len(servers) == 0) { @@ -390,25 +400,76 @@ func (c *RaftCluster) checkServices() { } } +// checkTSOService checks the TSO service. +func (c *RaftCluster) checkTSOService() { + if c.isAPIServiceMode { + return + } + + if err := c.startTSOJobs(); err != nil { + // If there is an error, need to wait for the next check. + log.Error("failed to start TSO jobs", errs.ZapError(err)) + return + } +} + func (c *RaftCluster) runServiceCheckJob() { defer logutil.LogPanic() defer c.wg.Done() - ticker := time.NewTicker(serviceCheckInterval) + schedulingTicker := time.NewTicker(schedulingServiceCheckInterval) failpoint.Inject("highFrequencyClusterJobs", func() { - ticker.Reset(time.Millisecond) + schedulingTicker.Reset(time.Millisecond) }) - defer ticker.Stop() + defer schedulingTicker.Stop() for { select { case <-c.ctx.Done(): log.Info("service check job is stopped") return - case <-ticker.C: - c.checkServices() + case <-schedulingTicker.C: + c.checkSchedulingService() + } + } +} + +func (c *RaftCluster) startTSOJobs() error { + allocator, err := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation) + if err != nil { + log.Error("failed to get global TSO allocator", errs.ZapError(err)) + return err + } + if !allocator.IsInitialize() { + log.Info("initializing the global TSO allocator") + if err := allocator.Initialize(0); err != nil { + log.Error("failed to initialize the global TSO allocator", errs.ZapError(err)) + return err } } + return nil +} + +func (c *RaftCluster) stopTSOJobs() error { + allocator, err := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation) + if err != nil { + log.Error("failed to get global TSO allocator", errs.ZapError(err)) + return err + } + if allocator.IsInitialize() { + c.tsoAllocator.ResetAllocatorGroup(tso.GlobalDCLocation, true) + failpoint.Inject("updateAfterResetTSO", func() { + allocator, _ := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation) + if err := allocator.UpdateTSO(); !errorspkg.Is(err, errs.ErrUpdateTimestamp) { + log.Panic("the tso update after reset should return ErrUpdateTimestamp as expected", zap.Error(err)) + } + if allocator.IsInitialize() { + log.Panic("the allocator should be uninitialized after reset") + } + }) + } + + return nil } // startGCTuner @@ -757,6 +818,9 @@ func (c *RaftCluster) Stop() { if !c.IsServiceIndependent(constant.SchedulingServiceName) { c.stopSchedulingJobs() } + if err := c.stopTSOJobs(); err != nil { + log.Error("failed to stop tso jobs", errs.ZapError(err)) + } c.heartbeatRunner.Stop() c.miscRunner.Stop() c.logRunner.Stop() diff --git a/server/server.go b/server/server.go index 9691633bae2..26f8ebb614c 100644 --- a/server/server.go +++ b/server/server.go @@ -17,7 +17,6 @@ package server import ( "bytes" "context" - errorspkg "errors" "fmt" "math/rand" "net/http" @@ -490,7 +489,7 @@ func (s *Server) startServer(ctx context.Context) error { s.gcSafePointManager = gc.NewSafePointManager(s.storage, s.cfg.PDServerCfg) s.basicCluster = core.NewBasicCluster() - s.cluster = cluster.NewRaftCluster(ctx, clusterID, s.GetBasicCluster(), s.GetStorage(), syncer.NewRegionSyncer(s), s.client, s.httpClient) + s.cluster = cluster.NewRaftCluster(ctx, clusterID, s.GetMember(), s.GetBasicCluster(), s.GetStorage(), syncer.NewRegionSyncer(s), s.client, s.httpClient, s.tsoAllocatorManager) keyspaceIDAllocator := id.NewAllocator(&id.AllocatorParams{ Client: s.client, RootPath: s.rootPath, @@ -1715,29 +1714,6 @@ func (s *Server) campaignLeader() { s.member.KeepLeader(ctx) log.Info(fmt.Sprintf("campaign %s leader ok", s.mode), zap.String("campaign-leader-name", s.Name())) - if !s.IsAPIServiceMode() { - allocator, err := s.tsoAllocatorManager.GetAllocator(tso.GlobalDCLocation) - if err != nil { - log.Error("failed to get the global TSO allocator", errs.ZapError(err)) - return - } - log.Info("initializing the global TSO allocator") - if err := allocator.Initialize(0); err != nil { - log.Error("failed to initialize the global TSO allocator", errs.ZapError(err)) - return - } - defer func() { - s.tsoAllocatorManager.ResetAllocatorGroup(tso.GlobalDCLocation, false) - failpoint.Inject("updateAfterResetTSO", func() { - if err = allocator.UpdateTSO(); !errorspkg.Is(err, errs.ErrUpdateTimestamp) { - log.Panic("the tso update after reset should return ErrUpdateTimestamp as expected", zap.Error(err)) - } - if allocator.IsInitialize() { - log.Panic("the allocator should be uninitialized after reset") - } - }) - }() - } if err := s.reloadConfigFromKV(); err != nil { log.Error("failed to reload configuration", errs.ZapError(err)) return diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index a669e093200..d1a649cbfa6 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -339,12 +339,12 @@ func (suite *tsoClientTestSuite) TestUpdateAfterResetTSO() { return err == nil }) // Resign leader to trigger the TSO resetting. - re.NoError(failpoint.Enable("github.com/tikv/pd/server/updateAfterResetTSO", "return(true)")) + re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/updateAfterResetTSO", "return(true)")) oldLeaderName := suite.cluster.WaitLeader() re.NotEmpty(oldLeaderName) err := suite.cluster.GetServer(oldLeaderName).ResignLeader() re.NoError(err) - re.NoError(failpoint.Disable("github.com/tikv/pd/server/updateAfterResetTSO")) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/updateAfterResetTSO")) newLeaderName := suite.cluster.WaitLeader() re.NotEmpty(newLeaderName) re.NotEqual(oldLeaderName, newLeaderName) diff --git a/tests/integrations/tso/consistency_test.go b/tests/integrations/tso/consistency_test.go index 9ebe6dec8af..f82f58ee6c8 100644 --- a/tests/integrations/tso/consistency_test.go +++ b/tests/integrations/tso/consistency_test.go @@ -81,6 +81,7 @@ func (suite *tsoConsistencyTestSuite) SetupSuite() { leaderName := suite.cluster.WaitLeader() re.NotEmpty(leaderName) suite.pdLeaderServer = suite.cluster.GetServer(leaderName) + suite.pdLeaderServer.BootstrapCluster() backendEndpoints := suite.pdLeaderServer.GetAddr() if suite.legacy { suite.pdClient = tu.MustNewGrpcClient(re, backendEndpoints) diff --git a/tests/integrations/tso/server_test.go b/tests/integrations/tso/server_test.go index 5e14611ab65..651a1df96b4 100644 --- a/tests/integrations/tso/server_test.go +++ b/tests/integrations/tso/server_test.go @@ -79,6 +79,7 @@ func (suite *tsoServerTestSuite) SetupSuite() { leaderName := suite.cluster.WaitLeader() re.NotEmpty(leaderName) suite.pdLeaderServer = suite.cluster.GetServer(leaderName) + suite.pdLeaderServer.BootstrapCluster() backendEndpoints := suite.pdLeaderServer.GetAddr() if suite.legacy { suite.pdClient = tu.MustNewGrpcClient(re, backendEndpoints) diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index a9be92d19e9..e1a56982f2d 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -912,7 +912,7 @@ func TestLoadClusterInfo(t *testing.T) { tc.WaitLeader() leaderServer := tc.GetLeaderServer() svr := leaderServer.GetServer() - rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient()) + rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetMember(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetTSOAllocatorManager()) // Cluster is not bootstrapped. rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetHBStreams(), svr.GetKeyspaceGroupManager()) @@ -952,7 +952,7 @@ func TestLoadClusterInfo(t *testing.T) { } re.NoError(testStorage.Flush()) - raftCluster = cluster.NewRaftCluster(ctx, svr.ClusterID(), basicCluster, testStorage, syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient()) + raftCluster = cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetMember(), basicCluster, testStorage, syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetTSOAllocatorManager()) raftCluster.InitCluster(mockid.NewIDAllocator(), svr.GetPersistOptions(), svr.GetHBStreams(), svr.GetKeyspaceGroupManager()) raftCluster, err = raftCluster.LoadClusterInfo() re.NoError(err) @@ -1666,7 +1666,7 @@ func TestTransferLeaderBack(t *testing.T) { tc.WaitLeader() leaderServer := tc.GetLeaderServer() svr := leaderServer.GetServer() - rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient()) + rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetMember(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetTSOAllocatorManager()) rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetHBStreams(), svr.GetKeyspaceGroupManager()) storage := rc.GetStorage() meta := &metapb.Cluster{Id: 123} diff --git a/tests/server/tso/allocator_test.go b/tests/server/tso/allocator_test.go index 692aec490eb..257cd3b6a34 100644 --- a/tests/server/tso/allocator_test.go +++ b/tests/server/tso/allocator_test.go @@ -127,6 +127,9 @@ func TestPriorityAndDifferentLocalTSO(t *testing.T) { re.NoError(cluster.RunInitialServers()) cluster.WaitAllLeaders(re, dcLocationConfig) + leaderServer := cluster.GetLeaderServer() + re.NotNil(leaderServer) + leaderServer.BootstrapCluster() // Wait for all nodes becoming healthy. time.Sleep(time.Second * 5) diff --git a/tests/server/tso/global_tso_test.go b/tests/server/tso/global_tso_test.go index d8f64afe871..c340c44d3d2 100644 --- a/tests/server/tso/global_tso_test.go +++ b/tests/server/tso/global_tso_test.go @@ -99,6 +99,7 @@ func TestDelaySyncTimestamp(t *testing.T) { var leaderServer, nextLeaderServer *tests.TestServer leaderServer = cluster.GetLeaderServer() re.NotNil(leaderServer) + leaderServer.BootstrapCluster() for _, s := range cluster.GetServers() { if s.GetConfig().Name != cluster.GetLeader() { nextLeaderServer = s @@ -146,6 +147,8 @@ func TestLogicalOverflow(t *testing.T) { re.NotEmpty(cluster.WaitLeader()) leaderServer := cluster.GetLeaderServer() + re.NotNil(leaderServer) + leaderServer.BootstrapCluster() grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) clusterID := leaderServer.GetClusterID() diff --git a/tests/server/tso/tso_test.go b/tests/server/tso/tso_test.go index 5be37e293cf..fc2f5999840 100644 --- a/tests/server/tso/tso_test.go +++ b/tests/server/tso/tso_test.go @@ -114,6 +114,8 @@ func TestDisableLocalTSOAfterEnabling(t *testing.T) { re.NoError(cluster.RunInitialServers()) cluster.WaitAllLeaders(re, dcLocationConfig) + leaderServer := cluster.GetLeaderServer() + leaderServer.BootstrapCluster() requestLocalTSOs(re, cluster, dcLocationConfig) // Reboot the cluster. @@ -125,7 +127,7 @@ func TestDisableLocalTSOAfterEnabling(t *testing.T) { re.NotEmpty(cluster.WaitLeader()) // Re-request the global TSOs. - leaderServer := cluster.GetLeaderServer() + leaderServer = cluster.GetLeaderServer() grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) clusterID := leaderServer.GetClusterID() req := &pdpb.TsoRequest{ diff --git a/tools/pd-backup/tests/backup_test.go b/tools/pd-backup/tests/backup_test.go index 2a55a790849..05d2b7b92ed 100644 --- a/tools/pd-backup/tests/backup_test.go +++ b/tools/pd-backup/tests/backup_test.go @@ -37,6 +37,8 @@ func TestBackup(t *testing.T) { err = cluster.RunInitialServers() re.NoError(err) re.NotEmpty(cluster.WaitLeader()) + leaderServer := cluster.GetLeaderServer() + leaderServer.BootstrapCluster() pdAddr := cluster.GetConfig().GetClientURL() urls := strings.Split(pdAddr, ",") defer cluster.Destroy() From b155a7b2e29449cd8216233debbe21c9c5c3c14f Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Tue, 22 Oct 2024 15:31:38 +0800 Subject: [PATCH 14/15] realclustertest: apply new style for some test cases (#8732) ref tikv/pd#8683 Signed-off-by: okJiang <819421878@qq.com> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- .../integrations/realcluster/real_cluster.go | 58 ++- .../realcluster/reboot_pd_test.go | 105 +++--- .../realcluster/scheduler_test.go | 341 ++++++++++-------- tests/integrations/realcluster/ts_test.go | 65 ++-- tests/integrations/realcluster/util.go | 5 - 5 files changed, 314 insertions(+), 260 deletions(-) diff --git a/tests/integrations/realcluster/real_cluster.go b/tests/integrations/realcluster/real_cluster.go index 21284f285b0..1843b78a528 100644 --- a/tests/integrations/realcluster/real_cluster.go +++ b/tests/integrations/realcluster/real_cluster.go @@ -19,6 +19,7 @@ import ( "os" "os/exec" "path/filepath" + "strings" "testing" "time" @@ -37,17 +38,9 @@ type realClusterSuite struct { var ( playgroundLogDir = filepath.Join("tmp", "real_cluster", "playground") - tiupBin string + tiupBin = os.Getenv("HOME") + "/.tiup/bin/tiup" ) -func init() { - var err error - tiupBin, err = exec.LookPath("tiup") - if err != nil { - panic(err) - } -} - // SetupSuite will run before the tests in the suite are run. func (s *realClusterSuite) SetupSuite() { t := s.T() @@ -78,7 +71,9 @@ func (s *realClusterSuite) TearDownSuite() { func (s *realClusterSuite) startRealCluster(t *testing.T) { log.Info("start to deploy a real cluster") - s.deploy(t) + tag := s.tag() + deployTiupPlayground(t, tag) + waitTiupReady(t, tag) s.clusterCnt++ } @@ -94,33 +89,26 @@ func (s *realClusterSuite) tag() string { return fmt.Sprintf("pd_real_cluster_test_%s_%d", s.suiteName, s.clusterCnt) } -// func restartTiUP() { -// log.Info("start to restart TiUP") -// cmd := exec.Command("make", "deploy") -// cmd.Stdout = os.Stdout -// cmd.Stderr = os.Stderr -// err := cmd.Run() -// if err != nil { -// panic(err) -// } -// log.Info("TiUP restart success") -// } - -func (s *realClusterSuite) deploy(t *testing.T) { +func (s *realClusterSuite) restart() { tag := s.tag() - deployTiupPlayground(t, tag) - waitTiupReady(t, tag) + log.Info("start to restart", zap.String("tag", tag)) + s.stopRealCluster(s.T()) + s.startRealCluster(s.T()) + log.Info("TiUP restart success") } func destroy(t *testing.T, tag string) { - cmdStr := fmt.Sprintf("ps -ef | grep 'tiup playground' | grep %s | awk '{print $2}' | head -n 1", tag) + cmdStr := fmt.Sprintf("ps -ef | grep %s | awk '{print $2}'", tag) cmd := exec.Command("sh", "-c", cmdStr) bytes, err := cmd.Output() require.NoError(t, err) - pid := string(bytes) - // nolint:errcheck - runCommand("sh", "-c", "kill -9 "+pid) - log.Info("destroy success", zap.String("pid", pid)) + pids := string(bytes) + pidArr := strings.Split(pids, "\n") + for _, pid := range pidArr { + // nolint:errcheck + runCommand("sh", "-c", "kill -9 "+pid) + } + log.Info("destroy success", zap.String("tag", tag)) } func deployTiupPlayground(t *testing.T, tag string) { @@ -146,11 +134,11 @@ func deployTiupPlayground(t *testing.T, tag string) { go func() { runCommand("sh", "-c", tiupBin+` playground nightly --kv 3 --tiflash 1 --db 1 --pd 3 \ - --without-monitor --tag `+tag+` --pd.binpath ./bin/pd-server \ - --kv.binpath ./third_bin/tikv-server \ - --db.binpath ./third_bin/tidb-server --tiflash.binpath ./third_bin/tiflash \ - --pd.config ./tests/integrations/realcluster/pd.toml \ - > `+filepath.Join(playgroundLogDir, tag+".log")+` 2>&1 & `) + --without-monitor --tag `+tag+` --pd.binpath ./bin/pd-server \ + --kv.binpath ./third_bin/tikv-server \ + --db.binpath ./third_bin/tidb-server --tiflash.binpath ./third_bin/tiflash \ + --pd.config ./tests/integrations/realcluster/pd.toml \ + > `+filepath.Join(playgroundLogDir, tag+".log")+` 2>&1 & `) }() // Avoid to change the dir before execute `tiup playground`. diff --git a/tests/integrations/realcluster/reboot_pd_test.go b/tests/integrations/realcluster/reboot_pd_test.go index 50b4bee2055..e3f37ac0605 100644 --- a/tests/integrations/realcluster/reboot_pd_test.go +++ b/tests/integrations/realcluster/reboot_pd_test.go @@ -14,45 +14,68 @@ package realcluster +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "github.com/tikv/pd/client/http" +) + +type rebootPDSuite struct { + realClusterSuite +} + +func TestRebootPD(t *testing.T) { + suite.Run(t, &rebootPDSuite{ + realClusterSuite: realClusterSuite{ + suiteName: "reboot_pd", + }, + }) +} + // https://github.com/tikv/pd/issues/6467 -// func TestReloadLabel(t *testing.T) { -// re := require.New(t) -// ctx := context.Background() - -// resp, err := pdHTTPCli.GetStores(ctx) -// re.NoError(err) -// re.NotEmpty(resp.Stores) -// firstStore := resp.Stores[0] -// // TiFlash labels will be ["engine": "tiflash"] -// // So we need to merge the labels -// storeLabels := map[string]string{ -// "zone": "zone1", -// } -// for _, label := range firstStore.Store.Labels { -// storeLabels[label.Key] = label.Value -// } -// re.NoError(pdHTTPCli.SetStoreLabels(ctx, firstStore.Store.ID, storeLabels)) -// defer func() { -// re.NoError(pdHTTPCli.DeleteStoreLabel(ctx, firstStore.Store.ID, "zone")) -// }() - -// checkLabelsAreEqual := func() { -// resp, err := pdHTTPCli.GetStore(ctx, uint64(firstStore.Store.ID)) -// re.NoError(err) - -// labelsMap := make(map[string]string) -// for _, label := range resp.Store.Labels { -// re.NotNil(label) -// labelsMap[label.Key] = label.Value -// } - -// for key, value := range storeLabels { -// re.Equal(value, labelsMap[key]) -// } -// } -// // Check the label is set -// checkLabelsAreEqual() -// // Restart TiUP to reload the label -// restartTiUP() -// checkLabelsAreEqual() -// } +func (s *rebootPDSuite) TestReloadLabel() { + re := require.New(s.T()) + ctx := context.Background() + + pdHTTPCli := http.NewClient("pd-real-cluster-test", getPDEndpoints(s.T())) + resp, err := pdHTTPCli.GetStores(ctx) + re.NoError(err) + re.NotEmpty(resp.Stores) + firstStore := resp.Stores[0] + // TiFlash labels will be ["engine": "tiflash"] + // So we need to merge the labels + storeLabels := map[string]string{ + "zone": "zone1", + } + for _, label := range firstStore.Store.Labels { + storeLabels[label.Key] = label.Value + } + re.NoError(pdHTTPCli.SetStoreLabels(ctx, firstStore.Store.ID, storeLabels)) + defer func() { + re.NoError(pdHTTPCli.DeleteStoreLabel(ctx, firstStore.Store.ID, "zone")) + }() + + checkLabelsAreEqual := func() { + resp, err := pdHTTPCli.GetStore(ctx, uint64(firstStore.Store.ID)) + re.NoError(err) + + labelsMap := make(map[string]string) + for _, label := range resp.Store.Labels { + re.NotNil(label) + labelsMap[label.Key] = label.Value + } + + for key, value := range storeLabels { + re.Equal(value, labelsMap[key]) + } + } + // Check the label is set + checkLabelsAreEqual() + // Restart to reload the label + s.restart() + pdHTTPCli = http.NewClient("pd-real-cluster-test", getPDEndpoints(s.T())) + checkLabelsAreEqual() +} diff --git a/tests/integrations/realcluster/scheduler_test.go b/tests/integrations/realcluster/scheduler_test.go index 7e5087627fb..69da846b491 100644 --- a/tests/integrations/realcluster/scheduler_test.go +++ b/tests/integrations/realcluster/scheduler_test.go @@ -14,161 +14,190 @@ package realcluster +import ( + "context" + "fmt" + "sort" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "github.com/tikv/pd/client/http" + "github.com/tikv/pd/client/testutil" + "github.com/tikv/pd/pkg/schedule/labeler" + "github.com/tikv/pd/pkg/schedule/types" +) + +type schedulerSuite struct { + realClusterSuite +} + +func TestScheduler(t *testing.T) { + suite.Run(t, &schedulerSuite{ + realClusterSuite: realClusterSuite{ + suiteName: "scheduler", + }, + }) +} + // https://github.com/tikv/pd/issues/6988#issuecomment-1694924611 // https://github.com/tikv/pd/issues/6897 -// func TestTransferLeader(t *testing.T) { -// re := require.New(t) -// ctx, cancel := context.WithCancel(context.Background()) -// defer cancel() - -// resp, err := pdHTTPCli.GetLeader(ctx) -// re.NoError(err) -// oldLeader := resp.Name - -// var newLeader string -// for i := 0; i < 2; i++ { -// if resp.Name != fmt.Sprintf("pd-%d", i) { -// newLeader = fmt.Sprintf("pd-%d", i) -// } -// } - -// // record scheduler -// re.NoError(pdHTTPCli.CreateScheduler(ctx, types.EvictLeaderScheduler.String(), 1)) -// defer func() { -// re.NoError(pdHTTPCli.DeleteScheduler(ctx, types.EvictLeaderScheduler.String())) -// }() -// res, err := pdHTTPCli.GetSchedulers(ctx) -// re.NoError(err) -// oldSchedulersLen := len(res) - -// re.NoError(pdHTTPCli.TransferLeader(ctx, newLeader)) -// // wait for transfer leader to new leader -// time.Sleep(1 * time.Second) -// resp, err = pdHTTPCli.GetLeader(ctx) -// re.NoError(err) -// re.Equal(newLeader, resp.Name) - -// res, err = pdHTTPCli.GetSchedulers(ctx) -// re.NoError(err) -// re.Len(res, oldSchedulersLen) - -// // transfer leader to old leader -// re.NoError(pdHTTPCli.TransferLeader(ctx, oldLeader)) -// // wait for transfer leader -// time.Sleep(1 * time.Second) -// resp, err = pdHTTPCli.GetLeader(ctx) -// re.NoError(err) -// re.Equal(oldLeader, resp.Name) - -// res, err = pdHTTPCli.GetSchedulers(ctx) -// re.NoError(err) -// re.Len(res, oldSchedulersLen) -// } - -// func TestRegionLabelDenyScheduler(t *testing.T) { -// re := require.New(t) -// ctx, cancel := context.WithCancel(context.Background()) -// defer cancel() - -// regions, err := pdHTTPCli.GetRegions(ctx) -// re.NoError(err) -// re.NotEmpty(regions.Regions) -// region1 := regions.Regions[0] - -// err = pdHTTPCli.DeleteScheduler(ctx, types.BalanceLeaderScheduler.String()) -// if err == nil { -// defer func() { -// pdHTTPCli.CreateScheduler(ctx, types.BalanceLeaderScheduler.String(), 0) -// }() -// } - -// re.NoError(pdHTTPCli.CreateScheduler(ctx, types.GrantLeaderScheduler.String(), uint64(region1.Leader.StoreID))) -// defer func() { -// pdHTTPCli.DeleteScheduler(ctx, types.GrantLeaderScheduler.String()) -// }() - -// // wait leader transfer -// testutil.Eventually(re, func() bool { -// regions, err := pdHTTPCli.GetRegions(ctx) -// re.NoError(err) -// for _, region := range regions.Regions { -// if region.Leader.StoreID != region1.Leader.StoreID { -// return false -// } -// } -// return true -// }, testutil.WithWaitFor(time.Minute)) - -// // disable schedule for region1 -// labelRule := &pd.LabelRule{ -// ID: "rule1", -// Labels: []pd.RegionLabel{{Key: "schedule", Value: "deny"}}, -// RuleType: "key-range", -// Data: labeler.MakeKeyRanges(region1.StartKey, region1.EndKey), -// } -// re.NoError(pdHTTPCli.SetRegionLabelRule(ctx, labelRule)) -// defer func() { -// pdHTTPCli.PatchRegionLabelRules(ctx, &pd.LabelRulePatch{DeleteRules: []string{labelRule.ID}}) -// }() -// labelRules, err := pdHTTPCli.GetAllRegionLabelRules(ctx) -// re.NoError(err) -// re.Len(labelRules, 2) -// sort.Slice(labelRules, func(i, j int) bool { -// return labelRules[i].ID < labelRules[j].ID -// }) -// re.Equal(labelRule.ID, labelRules[1].ID) -// re.Equal(labelRule.Labels, labelRules[1].Labels) -// re.Equal(labelRule.RuleType, labelRules[1].RuleType) - -// // enable evict leader scheduler, and check it works -// re.NoError(pdHTTPCli.DeleteScheduler(ctx, types.GrantLeaderScheduler.String())) -// re.NoError(pdHTTPCli.CreateScheduler(ctx, types.EvictLeaderScheduler.String(), uint64(region1.Leader.StoreID))) -// defer func() { -// pdHTTPCli.DeleteScheduler(ctx, types.EvictLeaderScheduler.String()) -// }() -// testutil.Eventually(re, func() bool { -// regions, err := pdHTTPCli.GetRegions(ctx) -// re.NoError(err) -// for _, region := range regions.Regions { -// if region.Leader.StoreID == region1.Leader.StoreID { -// return false -// } -// } -// return true -// }, testutil.WithWaitFor(time.Minute)) - -// re.NoError(pdHTTPCli.DeleteScheduler(ctx, types.EvictLeaderScheduler.String())) -// re.NoError(pdHTTPCli.CreateScheduler(ctx, types.GrantLeaderScheduler.String(), uint64(region1.Leader.StoreID))) -// defer func() { -// pdHTTPCli.DeleteScheduler(ctx, types.GrantLeaderScheduler.String()) -// }() -// testutil.Eventually(re, func() bool { -// regions, err := pdHTTPCli.GetRegions(ctx) -// re.NoError(err) -// for _, region := range regions.Regions { -// if region.ID == region1.ID { -// continue -// } -// if region.Leader.StoreID != region1.Leader.StoreID { -// return false -// } -// } -// return true -// }, testutil.WithWaitFor(time.Minute)) - -// pdHTTPCli.PatchRegionLabelRules(ctx, &pd.LabelRulePatch{DeleteRules: []string{labelRule.ID}}) -// labelRules, err = pdHTTPCli.GetAllRegionLabelRules(ctx) -// re.NoError(err) -// re.Len(labelRules, 1) - -// testutil.Eventually(re, func() bool { -// regions, err := pdHTTPCli.GetRegions(ctx) -// re.NoError(err) -// for _, region := range regions.Regions { -// if region.Leader.StoreID != region1.Leader.StoreID { -// return false -// } -// } -// return true -// }, testutil.WithWaitFor(time.Minute)) -// } +func (s *schedulerSuite) TestTransferLeader() { + re := require.New(s.T()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + pdHTTPCli := http.NewClient("pd-real-cluster-test", getPDEndpoints(s.T())) + resp, err := pdHTTPCli.GetLeader(ctx) + re.NoError(err) + oldLeader := resp.Name + + var newLeader string + for i := 0; i < 2; i++ { + if resp.Name != fmt.Sprintf("pd-%d", i) { + newLeader = fmt.Sprintf("pd-%d", i) + } + } + + // record scheduler + re.NoError(pdHTTPCli.CreateScheduler(ctx, types.EvictLeaderScheduler.String(), 1)) + defer func() { + re.NoError(pdHTTPCli.DeleteScheduler(ctx, types.EvictLeaderScheduler.String())) + }() + res, err := pdHTTPCli.GetSchedulers(ctx) + re.NoError(err) + oldSchedulersLen := len(res) + + re.NoError(pdHTTPCli.TransferLeader(ctx, newLeader)) + // wait for transfer leader to new leader + time.Sleep(1 * time.Second) + resp, err = pdHTTPCli.GetLeader(ctx) + re.NoError(err) + re.Equal(newLeader, resp.Name) + + res, err = pdHTTPCli.GetSchedulers(ctx) + re.NoError(err) + re.Len(res, oldSchedulersLen) + + // transfer leader to old leader + re.NoError(pdHTTPCli.TransferLeader(ctx, oldLeader)) + // wait for transfer leader + time.Sleep(1 * time.Second) + resp, err = pdHTTPCli.GetLeader(ctx) + re.NoError(err) + re.Equal(oldLeader, resp.Name) + + res, err = pdHTTPCli.GetSchedulers(ctx) + re.NoError(err) + re.Len(res, oldSchedulersLen) +} + +func (s *schedulerSuite) TestRegionLabelDenyScheduler() { + re := require.New(s.T()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + pdHTTPCli := http.NewClient("pd-real-cluster-test", getPDEndpoints(s.T())) + regions, err := pdHTTPCli.GetRegions(ctx) + re.NoError(err) + re.NotEmpty(regions.Regions) + region1 := regions.Regions[0] + + err = pdHTTPCli.DeleteScheduler(ctx, types.BalanceLeaderScheduler.String()) + if err == nil { + defer func() { + pdHTTPCli.CreateScheduler(ctx, types.BalanceLeaderScheduler.String(), 0) + }() + } + + re.NoError(pdHTTPCli.CreateScheduler(ctx, types.GrantLeaderScheduler.String(), uint64(region1.Leader.StoreID))) + defer func() { + pdHTTPCli.DeleteScheduler(ctx, types.GrantLeaderScheduler.String()) + }() + + // wait leader transfer + testutil.Eventually(re, func() bool { + regions, err := pdHTTPCli.GetRegions(ctx) + re.NoError(err) + for _, region := range regions.Regions { + if region.Leader.StoreID != region1.Leader.StoreID { + return false + } + } + return true + }, testutil.WithWaitFor(time.Minute)) + + // disable schedule for region1 + labelRule := &http.LabelRule{ + ID: "rule1", + Labels: []http.RegionLabel{{Key: "schedule", Value: "deny"}}, + RuleType: "key-range", + Data: labeler.MakeKeyRanges(region1.StartKey, region1.EndKey), + } + re.NoError(pdHTTPCli.SetRegionLabelRule(ctx, labelRule)) + defer func() { + pdHTTPCli.PatchRegionLabelRules(ctx, &http.LabelRulePatch{DeleteRules: []string{labelRule.ID}}) + }() + labelRules, err := pdHTTPCli.GetAllRegionLabelRules(ctx) + re.NoError(err) + re.Len(labelRules, 2) + sort.Slice(labelRules, func(i, j int) bool { + return labelRules[i].ID < labelRules[j].ID + }) + re.Equal(labelRule.ID, labelRules[1].ID) + re.Equal(labelRule.Labels, labelRules[1].Labels) + re.Equal(labelRule.RuleType, labelRules[1].RuleType) + + // enable evict leader scheduler, and check it works + re.NoError(pdHTTPCli.DeleteScheduler(ctx, types.GrantLeaderScheduler.String())) + re.NoError(pdHTTPCli.CreateScheduler(ctx, types.EvictLeaderScheduler.String(), uint64(region1.Leader.StoreID))) + defer func() { + pdHTTPCli.DeleteScheduler(ctx, types.EvictLeaderScheduler.String()) + }() + testutil.Eventually(re, func() bool { + regions, err := pdHTTPCli.GetRegions(ctx) + re.NoError(err) + for _, region := range regions.Regions { + if region.Leader.StoreID == region1.Leader.StoreID { + return false + } + } + return true + }, testutil.WithWaitFor(time.Minute)) + + re.NoError(pdHTTPCli.DeleteScheduler(ctx, types.EvictLeaderScheduler.String())) + re.NoError(pdHTTPCli.CreateScheduler(ctx, types.GrantLeaderScheduler.String(), uint64(region1.Leader.StoreID))) + defer func() { + pdHTTPCli.DeleteScheduler(ctx, types.GrantLeaderScheduler.String()) + }() + testutil.Eventually(re, func() bool { + regions, err := pdHTTPCli.GetRegions(ctx) + re.NoError(err) + for _, region := range regions.Regions { + if region.ID == region1.ID { + continue + } + if region.Leader.StoreID != region1.Leader.StoreID { + return false + } + } + return true + }, testutil.WithWaitFor(time.Minute)) + + pdHTTPCli.PatchRegionLabelRules(ctx, &http.LabelRulePatch{DeleteRules: []string{labelRule.ID}}) + labelRules, err = pdHTTPCli.GetAllRegionLabelRules(ctx) + re.NoError(err) + re.Len(labelRules, 1) + + testutil.Eventually(re, func() bool { + regions, err := pdHTTPCli.GetRegions(ctx) + re.NoError(err) + for _, region := range regions.Regions { + if region.Leader.StoreID != region1.Leader.StoreID { + return false + } + } + return true + }, testutil.WithWaitFor(time.Minute)) +} diff --git a/tests/integrations/realcluster/ts_test.go b/tests/integrations/realcluster/ts_test.go index 156e3d63e71..f19124d04a4 100644 --- a/tests/integrations/realcluster/ts_test.go +++ b/tests/integrations/realcluster/ts_test.go @@ -14,26 +14,45 @@ package realcluster -// func TestTS(t *testing.T) { -// re := require.New(t) - -// db := OpenTestDB(t) -// db.MustExec("use test") -// db.MustExec("drop table if exists t") -// db.MustExec("create table t(a int, index i(a))") -// db.MustExec("insert t values (1), (2), (3)") -// var rows int -// err := db.inner.Raw("select count(*) from t").Row().Scan(&rows) -// re.NoError(err) -// re.Equal(3, rows) - -// re.NoError(err) -// re.Equal(3, rows) - -// var ts uint64 -// err = db.inner.Begin().Raw("select @@tidb_current_ts").Scan(&ts).Rollback().Error -// re.NoError(err) -// re.NotEqual(0, GetTimeFromTS(ts)) - -// db.MustClose() -// } +import ( + "testing" + + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +type tsSuite struct { + realClusterSuite +} + +func TestTS(t *testing.T) { + suite.Run(t, &tsSuite{ + realClusterSuite: realClusterSuite{ + suiteName: "ts", + }, + }) +} + +func (s *tsSuite) TestTS() { + re := require.New(s.T()) + + db := OpenTestDB(s.T()) + db.MustExec("use test") + db.MustExec("drop table if exists t") + db.MustExec("create table t(a int, index i(a))") + db.MustExec("insert t values (1), (2), (3)") + var rows int + err := db.inner.Raw("select count(*) from t").Row().Scan(&rows) + re.NoError(err) + re.Equal(3, rows) + + re.NoError(err) + re.Equal(3, rows) + + var ts uint64 + err = db.inner.Begin().Raw("select @@tidb_current_ts").Scan(&ts).Rollback().Error + re.NoError(err) + re.NotEqual(0, GetTimeFromTS(ts)) + + db.MustClose() +} diff --git a/tests/integrations/realcluster/util.go b/tests/integrations/realcluster/util.go index 013c41da7f3..789ceaa29c2 100644 --- a/tests/integrations/realcluster/util.go +++ b/tests/integrations/realcluster/util.go @@ -22,11 +22,6 @@ import ( const physicalShiftBits = 18 -var ( -// pdAddrs = []string{"http://127.0.0.1:2379"} -// pdHTTPCli = http.NewClient("pd-real-cluster-test", pdAddrs) -) - // GetTimeFromTS extracts time.Time from a timestamp. func GetTimeFromTS(ts uint64) time.Time { ms := ExtractPhysical(ts) From d82e41d09f95b95642059a9a8a073ff2b368cae8 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 23 Oct 2024 09:31:29 +0800 Subject: [PATCH 15/15] statistics: rename `RegionStats` in hot statistics (#8740) ref tikv/pd#4399 Signed-off-by: lhy1024 --- pkg/mcs/scheduling/server/cluster.go | 25 ++++++++--------- pkg/mock/mockcluster/mockcluster.go | 16 ++++------- pkg/schedule/coordinator.go | 29 ++++---------------- pkg/schedule/schedulers/hot_region.go | 4 +-- pkg/schedule/schedulers/hot_region_test.go | 32 +++++++++++----------- pkg/statistics/hot_cache.go | 7 +++-- pkg/statistics/hot_peer_cache.go | 6 ++-- pkg/statistics/hot_peer_cache_test.go | 2 +- pkg/statistics/region_stat_informer.go | 8 ++---- server/cluster/cluster_test.go | 22 +++++++-------- server/cluster/scheduling_controller.go | 24 +++++++--------- 11 files changed, 71 insertions(+), 104 deletions(-) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 5885a9cdb84..66cf1e97518 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -184,21 +184,18 @@ func (c *Cluster) GetHotPeerStat(rw utils.RWType, regionID, storeID uint64) *sta return c.hotStat.GetHotPeerStat(rw, regionID, storeID) } -// RegionReadStats returns hot region's read stats. +// GetHotPeerStats returns the read or write statistics for hot regions. +// It returns a map where the keys are store IDs and the values are slices of HotPeerStat. // The result only includes peers that are hot enough. -// RegionStats is a thread-safe method -func (c *Cluster) RegionReadStats() map[uint64][]*statistics.HotPeerStat { - // As read stats are reported by store heartbeat, the threshold needs to be adjusted. - threshold := c.persistConfig.GetHotRegionCacheHitsThreshold() * - (utils.RegionHeartBeatReportInterval / utils.StoreHeartBeatReportInterval) - return c.hotStat.RegionStats(utils.Read, threshold) -} - -// RegionWriteStats returns hot region's write stats. -// The result only includes peers that are hot enough. -func (c *Cluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat { - // RegionStats is a thread-safe method - return c.hotStat.RegionStats(utils.Write, c.persistConfig.GetHotRegionCacheHitsThreshold()) +// GetHotPeerStats is a thread-safe method. +func (c *Cluster) GetHotPeerStats(rw utils.RWType) map[uint64][]*statistics.HotPeerStat { + threshold := c.persistConfig.GetHotRegionCacheHitsThreshold() + if rw == utils.Read { + // As read stats are reported by store heartbeat, the threshold needs to be adjusted. + threshold = c.persistConfig.GetHotRegionCacheHitsThreshold() * + (utils.RegionHeartBeatReportInterval / utils.StoreHeartBeatReportInterval) + } + return c.hotStat.GetHotPeerStats(rw, threshold) } // BucketsStats returns hot region's buckets stats. diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index bbd4fbb6811..8d7317f547b 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -148,13 +148,6 @@ func (mc *Cluster) GetHotPeerStat(rw utils.RWType, regionID, storeID uint64) *st return mc.HotCache.GetHotPeerStat(rw, regionID, storeID) } -// RegionReadStats returns hot region's read stats. -// The result only includes peers that are hot enough. -func (mc *Cluster) RegionReadStats() map[uint64][]*statistics.HotPeerStat { - // We directly use threshold for read stats for mockCluster - return mc.HotCache.RegionStats(utils.Read, mc.GetHotRegionCacheHitsThreshold()) -} - // BucketsStats returns hot region's buckets stats. func (mc *Cluster) BucketsStats(degree int, regions ...uint64) map[uint64][]*buckets.BucketStat { task := buckets.NewCollectBucketStatsTask(degree, regions...) @@ -164,10 +157,11 @@ func (mc *Cluster) BucketsStats(degree int, regions ...uint64) map[uint64][]*buc return task.WaitRet(mc.ctx) } -// RegionWriteStats returns hot region's write stats. +// GetHotPeerStats returns the read or write statistics for hot regions. +// It returns a map where the keys are store IDs and the values are slices of HotPeerStat. // The result only includes peers that are hot enough. -func (mc *Cluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat { - return mc.HotCache.RegionStats(utils.Write, mc.GetHotRegionCacheHitsThreshold()) +func (mc *Cluster) GetHotPeerStats(rw utils.RWType) map[uint64][]*statistics.HotPeerStat { + return mc.HotCache.GetHotPeerStats(rw, mc.GetHotRegionCacheHitsThreshold()) } // HotRegionsFromStore picks hot regions in specify store. @@ -185,7 +179,7 @@ func (mc *Cluster) HotRegionsFromStore(store uint64, kind utils.RWType) []*core. // hotRegionsFromStore picks hot region in specify store. func hotRegionsFromStore(w *statistics.HotCache, storeID uint64, kind utils.RWType, minHotDegree int) []*statistics.HotPeerStat { - if stats, ok := w.RegionStats(kind, minHotDegree)[storeID]; ok && len(stats) > 0 { + if stats, ok := w.GetHotPeerStats(kind, minHotDegree)[storeID]; ok && len(stats) > 0 { return stats } return nil diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 2736c687fdb..344621c8b5b 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -432,16 +432,8 @@ func (c *Coordinator) GetHotRegionsByType(typ utils.RWType) *statistics.StoreHot isTraceFlow := c.cluster.GetSchedulerConfig().IsTraceRegionFlow() storeLoads := c.cluster.GetStoresLoads() stores := c.cluster.GetStores() - var infos *statistics.StoreHotPeersInfos - switch typ { - case utils.Write: - regionStats := c.cluster.RegionWriteStats() - infos = statistics.GetHotStatus(stores, storeLoads, regionStats, utils.Write, isTraceFlow) - case utils.Read: - regionStats := c.cluster.RegionReadStats() - infos = statistics.GetHotStatus(stores, storeLoads, regionStats, utils.Read, isTraceFlow) - default: - } + hotPeerStats := c.cluster.GetHotPeerStats(typ) + infos := statistics.GetHotStatus(stores, storeLoads, hotPeerStats, typ, isTraceFlow) // update params `IsLearner` and `LastUpdateTime` s := []statistics.StoreHotPeersStat{infos.AsLeader, infos.AsPeer} for i, stores := range s { @@ -505,20 +497,9 @@ func (c *Coordinator) CollectHotSpotMetrics() { } func collectHotMetrics(cluster sche.ClusterInformer, stores []*core.StoreInfo, typ utils.RWType) { - var ( - kind string - regionStats map[uint64][]*statistics.HotPeerStat - ) - - switch typ { - case utils.Read: - regionStats = cluster.RegionReadStats() - kind = utils.Read.String() - case utils.Write: - regionStats = cluster.RegionWriteStats() - kind = utils.Write.String() - } - status := statistics.CollectHotPeerInfos(stores, regionStats) // only returns TotalBytesRate,TotalKeysRate,TotalQueryRate,Count + kind := typ.String() + hotPeerStats := cluster.GetHotPeerStats(typ) + status := statistics.CollectHotPeerInfos(stores, hotPeerStats) // only returns TotalBytesRate,TotalKeysRate,TotalQueryRate,Count for _, s := range stores { // TODO: pre-allocate gauge metrics diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index eedbcfe4625..97a558c3fe4 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -130,7 +130,7 @@ func (s *baseHotScheduler) prepareForBalance(typ resourceType, cluster sche.Sche // update read statistics // avoid to update read statistics frequently if time.Since(s.updateReadTime) >= statisticsInterval { - regionRead := cluster.RegionReadStats() + regionRead := cluster.GetHotPeerStats(utils.Read) prepare(regionRead, utils.Read, constant.LeaderKind) prepare(regionRead, utils.Read, constant.RegionKind) s.updateReadTime = time.Now() @@ -139,7 +139,7 @@ func (s *baseHotScheduler) prepareForBalance(typ resourceType, cluster sche.Sche // update write statistics // avoid to update write statistics frequently if time.Since(s.updateWriteTime) >= statisticsInterval { - regionWrite := cluster.RegionWriteStats() + regionWrite := cluster.GetHotPeerStats(utils.Write) prepare(regionWrite, utils.Write, constant.LeaderKind) prepare(regionWrite, utils.Write, constant.RegionKind) s.updateWriteTime = time.Now() diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index a4b3225312d..195effaecab 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -1297,7 +1297,7 @@ func TestHotReadRegionScheduleByteRateOnly(t *testing.T) { r := tc.HotRegionsFromStore(2, utils.Read) re.Len(r, 3) // check hot items - stats := tc.HotCache.RegionStats(utils.Read, 0) + stats := tc.HotCache.GetHotPeerStats(utils.Read, 0) re.Len(stats, 3) for _, ss := range stats { for _, s := range ss { @@ -1623,7 +1623,7 @@ func TestHotCacheUpdateCache(t *testing.T) { // lower than hot read flow rate, but higher than write flow rate {11, []uint64{1, 2, 3}, 7 * units.KiB, 0, 0}, }) - stats := tc.RegionStats(utils.Read, 0) + stats := tc.HotCache.GetHotPeerStats(utils.Read, 0) re.Len(stats[1], 3) re.Len(stats[2], 3) re.Len(stats[3], 3) @@ -1632,7 +1632,7 @@ func TestHotCacheUpdateCache(t *testing.T) { {3, []uint64{2, 1, 3}, 20 * units.KiB, 0, 0}, {11, []uint64{1, 2, 3}, 7 * units.KiB, 0, 0}, }) - stats = tc.RegionStats(utils.Read, 0) + stats = tc.HotCache.GetHotPeerStats(utils.Read, 0) re.Len(stats[1], 3) re.Len(stats[2], 3) re.Len(stats[3], 3) @@ -1642,7 +1642,7 @@ func TestHotCacheUpdateCache(t *testing.T) { {5, []uint64{1, 2, 3}, 20 * units.KiB, 0, 0}, {6, []uint64{1, 2, 3}, 0.8 * units.KiB, 0, 0}, }) - stats = tc.RegionStats(utils.Write, 0) + stats = tc.HotCache.GetHotPeerStats(utils.Write, 0) re.Len(stats[1], 2) re.Len(stats[2], 2) re.Len(stats[3], 2) @@ -1650,7 +1650,7 @@ func TestHotCacheUpdateCache(t *testing.T) { addRegionInfo(tc, utils.Write, []testRegionInfo{ {5, []uint64{1, 2, 5}, 20 * units.KiB, 0, 0}, }) - stats = tc.RegionStats(utils.Write, 0) + stats = tc.HotCache.GetHotPeerStats(utils.Write, 0) re.Len(stats[1], 2) re.Len(stats[2], 2) @@ -1665,7 +1665,7 @@ func TestHotCacheUpdateCache(t *testing.T) { // lower than hot read flow rate, but higher than write flow rate {31, []uint64{4, 5, 6}, 7 * units.KiB, 0, 0}, }) - stats = tc.RegionStats(utils.Read, 0) + stats = tc.HotCache.GetHotPeerStats(utils.Read, 0) re.Len(stats[4], 2) re.Len(stats[5], 1) re.Empty(stats[6]) @@ -1684,13 +1684,13 @@ func TestHotCacheKeyThresholds(t *testing.T) { {1, []uint64{1, 2, 3}, 0, 1, 0}, {2, []uint64{1, 2, 3}, 0, 1 * units.KiB, 0}, }) - stats := tc.RegionStats(utils.Read, 0) + stats := tc.HotCache.GetHotPeerStats(utils.Read, 0) re.Len(stats[1], 1) addRegionInfo(tc, utils.Write, []testRegionInfo{ {3, []uint64{4, 5, 6}, 0, 1, 0}, {4, []uint64{4, 5, 6}, 0, 1 * units.KiB, 0}, }) - stats = tc.RegionStats(utils.Write, 0) + stats = tc.HotCache.GetHotPeerStats(utils.Write, 0) re.Len(stats[4], 1) re.Len(stats[5], 1) re.Len(stats[6], 1) @@ -1716,7 +1716,7 @@ func TestHotCacheKeyThresholds(t *testing.T) { { // read addRegionInfo(tc, utils.Read, regions) - stats := tc.RegionStats(utils.Read, 0) + stats := tc.HotCache.GetHotPeerStats(utils.Read, 0) re.Greater(len(stats[1]), 500) // for AntiCount @@ -1724,12 +1724,12 @@ func TestHotCacheKeyThresholds(t *testing.T) { addRegionInfo(tc, utils.Read, regions) addRegionInfo(tc, utils.Read, regions) addRegionInfo(tc, utils.Read, regions) - stats = tc.RegionStats(utils.Read, 0) + stats = tc.HotCache.GetHotPeerStats(utils.Read, 0) re.Len(stats[1], 500) } { // write addRegionInfo(tc, utils.Write, regions) - stats := tc.RegionStats(utils.Write, 0) + stats := tc.HotCache.GetHotPeerStats(utils.Write, 0) re.Greater(len(stats[1]), 500) re.Greater(len(stats[2]), 500) re.Greater(len(stats[3]), 500) @@ -1739,7 +1739,7 @@ func TestHotCacheKeyThresholds(t *testing.T) { addRegionInfo(tc, utils.Write, regions) addRegionInfo(tc, utils.Write, regions) addRegionInfo(tc, utils.Write, regions) - stats = tc.RegionStats(utils.Write, 0) + stats = tc.HotCache.GetHotPeerStats(utils.Write, 0) re.Len(stats[1], 500) re.Len(stats[2], 500) re.Len(stats[3], 500) @@ -1766,7 +1766,7 @@ func TestHotCacheByteAndKey(t *testing.T) { } { // read addRegionInfo(tc, utils.Read, regions) - stats := tc.RegionStats(utils.Read, 0) + stats := tc.HotCache.GetHotPeerStats(utils.Read, 0) re.Len(stats[1], 500) addRegionInfo(tc, utils.Read, []testRegionInfo{ @@ -1775,12 +1775,12 @@ func TestHotCacheByteAndKey(t *testing.T) { {10003, []uint64{1, 2, 3}, 10 * units.KiB, 500 * units.KiB, 0}, {10004, []uint64{1, 2, 3}, 500 * units.KiB, 500 * units.KiB, 0}, }) - stats = tc.RegionStats(utils.Read, 0) + stats = tc.HotCache.GetHotPeerStats(utils.Read, 0) re.Len(stats[1], 503) } { // write addRegionInfo(tc, utils.Write, regions) - stats := tc.RegionStats(utils.Write, 0) + stats := tc.HotCache.GetHotPeerStats(utils.Write, 0) re.Len(stats[1], 500) re.Len(stats[2], 500) re.Len(stats[3], 500) @@ -1790,7 +1790,7 @@ func TestHotCacheByteAndKey(t *testing.T) { {10003, []uint64{1, 2, 3}, 10 * units.KiB, 500 * units.KiB, 0}, {10004, []uint64{1, 2, 3}, 500 * units.KiB, 500 * units.KiB, 0}, }) - stats = tc.RegionStats(utils.Write, 0) + stats = tc.HotCache.GetHotPeerStats(utils.Write, 0) re.Len(stats[1], 503) re.Len(stats[2], 503) re.Len(stats[3], 503) diff --git a/pkg/statistics/hot_cache.go b/pkg/statistics/hot_cache.go index 86f7d7d6b08..3c0e45e2199 100644 --- a/pkg/statistics/hot_cache.go +++ b/pkg/statistics/hot_cache.go @@ -76,11 +76,12 @@ func (w *HotCache) CheckReadAsync(task func(cache *HotPeerCache)) bool { } } -// RegionStats returns hot items according to kind -func (w *HotCache) RegionStats(kind utils.RWType, minHotDegree int) map[uint64][]*HotPeerStat { +// RegionStats returns the read or write statistics for hot regions. +// It returns a map where the keys are store IDs and the values are slices of HotPeerStat. +func (w *HotCache) GetHotPeerStats(kind utils.RWType, minHotDegree int) map[uint64][]*HotPeerStat { ret := make(chan map[uint64][]*HotPeerStat, 1) collectRegionStatsTask := func(cache *HotPeerCache) { - ret <- cache.RegionStats(minHotDegree) + ret <- cache.GetHotPeerStats(minHotDegree) } var succ bool switch kind { diff --git a/pkg/statistics/hot_peer_cache.go b/pkg/statistics/hot_peer_cache.go index 8d1f64ca540..89f767577bd 100644 --- a/pkg/statistics/hot_peer_cache.go +++ b/pkg/statistics/hot_peer_cache.go @@ -84,9 +84,9 @@ func NewHotPeerCache(ctx context.Context, kind utils.RWType) *HotPeerCache { } } -// TODO: rename RegionStats as PeerStats -// RegionStats returns hot items -func (f *HotPeerCache) RegionStats(minHotDegree int) map[uint64][]*HotPeerStat { +// GetHotPeerStats returns the read or write statistics for hot regions. +// It returns a map where the keys are store IDs and the values are slices of HotPeerStat. +func (f *HotPeerCache) GetHotPeerStats(minHotDegree int) map[uint64][]*HotPeerStat { res := make(map[uint64][]*HotPeerStat) defaultAntiCount := f.kind.DefaultAntiCount() for storeID, peers := range f.peersOfStore { diff --git a/pkg/statistics/hot_peer_cache_test.go b/pkg/statistics/hot_peer_cache_test.go index ce4e352bc3d..38a185fa483 100644 --- a/pkg/statistics/hot_peer_cache_test.go +++ b/pkg/statistics/hot_peer_cache_test.go @@ -39,7 +39,7 @@ func TestStoreTimeUnsync(t *testing.T) { region := buildRegion(utils.Write, 3, interval) checkAndUpdate(re, cache, region, 3) { - stats := cache.RegionStats(0) + stats := cache.GetHotPeerStats(0) re.Len(stats, 3) for _, s := range stats { re.Len(s, 1) diff --git a/pkg/statistics/region_stat_informer.go b/pkg/statistics/region_stat_informer.go index 4fec5b4aacf..c91ba7be317 100644 --- a/pkg/statistics/region_stat_informer.go +++ b/pkg/statistics/region_stat_informer.go @@ -23,10 +23,8 @@ import ( type RegionStatInformer interface { GetHotPeerStat(rw utils.RWType, regionID, storeID uint64) *HotPeerStat IsRegionHot(region *core.RegionInfo) bool - // RegionWriteStats return the storeID -> write stat of peers on this store. + // GetHotPeerStats return the read or write statistics for hot regions. + // It returns a map where the keys are store IDs and the values are slices of HotPeerStat. // The result only includes peers that are hot enough. - RegionWriteStats() map[uint64][]*HotPeerStat - // RegionReadStats return the storeID -> read stat of peers on this store. - // The result only includes peers that are hot enough. - RegionReadStats() map[uint64][]*HotPeerStat + GetHotPeerStats(rw utils.RWType) map[uint64][]*HotPeerStat } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index a37f9718bdb..c83f485ad3d 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -161,7 +161,7 @@ func TestStoreHeartbeat(t *testing.T) { re.NoError(cluster.HandleStoreHeartbeat(hotReq, hotResp)) re.Equal("v1", cluster.GetStore(1).GetStoreLimit().Version()) time.Sleep(20 * time.Millisecond) - storeStats := cluster.hotStat.RegionStats(utils.Read, 3) + storeStats := cluster.hotStat.GetHotPeerStats(utils.Read, 3) re.Len(storeStats[1], 1) re.Equal(uint64(1), storeStats[1][0].RegionID) interval := float64(hotHeartBeat.Interval.EndTimestamp - hotHeartBeat.Interval.StartTimestamp) @@ -169,15 +169,15 @@ func TestStoreHeartbeat(t *testing.T) { re.Equal(float64(hotHeartBeat.PeerStats[0].ReadBytes)/interval, storeStats[1][0].Loads[utils.ByteDim]) re.Equal(float64(hotHeartBeat.PeerStats[0].ReadKeys)/interval, storeStats[1][0].Loads[utils.KeyDim]) re.Equal(float64(hotHeartBeat.PeerStats[0].QueryStats.Get)/interval, storeStats[1][0].Loads[utils.QueryDim]) - // After cold heartbeat, we won't find region 1 peer in regionStats + // After cold heartbeat, we won't find region 1 peer in HotPeerStats re.NoError(cluster.HandleStoreHeartbeat(coldReq, coldResp)) time.Sleep(20 * time.Millisecond) - storeStats = cluster.hotStat.RegionStats(utils.Read, 1) + storeStats = cluster.hotStat.GetHotPeerStats(utils.Read, 1) re.Empty(storeStats[1]) // After hot heartbeat, we can find region 1 peer again re.NoError(cluster.HandleStoreHeartbeat(hotReq, hotResp)) time.Sleep(20 * time.Millisecond) - storeStats = cluster.hotStat.RegionStats(utils.Read, 3) + storeStats = cluster.hotStat.GetHotPeerStats(utils.Read, 3) re.Len(storeStats[1], 1) re.Equal(uint64(1), storeStats[1][0].RegionID) // after several cold heartbeats, and one hot heartbeat, we also can't find region 1 peer @@ -185,19 +185,19 @@ func TestStoreHeartbeat(t *testing.T) { re.NoError(cluster.HandleStoreHeartbeat(coldReq, coldResp)) re.NoError(cluster.HandleStoreHeartbeat(coldReq, coldResp)) time.Sleep(20 * time.Millisecond) - storeStats = cluster.hotStat.RegionStats(utils.Read, 0) + storeStats = cluster.hotStat.GetHotPeerStats(utils.Read, 0) re.Empty(storeStats[1]) re.NoError(cluster.HandleStoreHeartbeat(hotReq, hotResp)) time.Sleep(20 * time.Millisecond) - storeStats = cluster.hotStat.RegionStats(utils.Read, 1) + storeStats = cluster.hotStat.GetHotPeerStats(utils.Read, 1) re.Empty(storeStats[1]) - storeStats = cluster.hotStat.RegionStats(utils.Read, 3) + storeStats = cluster.hotStat.GetHotPeerStats(utils.Read, 3) re.Empty(storeStats[1]) // after 2 hot heartbeats, wo can find region 1 peer again re.NoError(cluster.HandleStoreHeartbeat(hotReq, hotResp)) re.NoError(cluster.HandleStoreHeartbeat(hotReq, hotResp)) time.Sleep(20 * time.Millisecond) - storeStats = cluster.hotStat.RegionStats(utils.Read, 3) + storeStats = cluster.hotStat.GetHotPeerStats(utils.Read, 3) re.Len(storeStats[1], 1) re.Equal(uint64(1), storeStats[1][0].RegionID) } @@ -642,7 +642,7 @@ func TestRegionHeartbeatHotStat(t *testing.T) { re.NoError(err) // wait HotStat to update items time.Sleep(time.Second) - stats := cluster.hotStat.RegionStats(utils.Write, 0) + stats := cluster.hotStat.GetHotPeerStats(utils.Write, 0) re.Len(stats[1], 1) re.Len(stats[2], 1) re.Len(stats[3], 1) @@ -655,7 +655,7 @@ func TestRegionHeartbeatHotStat(t *testing.T) { re.NoError(err) // wait HotStat to update items time.Sleep(time.Second) - stats = cluster.hotStat.RegionStats(utils.Write, 0) + stats = cluster.hotStat.GetHotPeerStats(utils.Write, 0) re.Len(stats[1], 1) re.Empty(stats[2]) re.Len(stats[3], 1) @@ -2593,7 +2593,7 @@ func TestCollectMetrics(t *testing.T) { rc.collectSchedulingMetrics() } stores := co.GetCluster().GetStores() - regionStats := co.GetCluster().RegionWriteStats() + regionStats := co.GetCluster().GetHotPeerStats(utils.Write) status1 := statistics.CollectHotPeerInfos(stores, regionStats) status2 := statistics.GetHotStatus(stores, co.GetCluster().GetStoresLoads(), regionStats, utils.Write, co.GetCluster().GetSchedulerConfig().IsTraceRegionFlow()) for _, s := range status2.AsLeader { diff --git a/server/cluster/scheduling_controller.go b/server/cluster/scheduling_controller.go index 8578b3480d8..49808f4d29d 100644 --- a/server/cluster/scheduling_controller.go +++ b/server/cluster/scheduling_controller.go @@ -270,21 +270,17 @@ func (sc *schedulingController) GetHotPeerStat(rw utils.RWType, regionID, storeI return sc.hotStat.GetHotPeerStat(rw, regionID, storeID) } -// RegionReadStats returns hot region's read stats. +// GetHotPeerStats returns the read or write statistics for hot regions. +// It returns a map where the keys are store IDs and the values are slices of HotPeerStat. // The result only includes peers that are hot enough. -// RegionStats is a thread-safe method -func (sc *schedulingController) RegionReadStats() map[uint64][]*statistics.HotPeerStat { - // As read stats are reported by store heartbeat, the threshold needs to be adjusted. - threshold := sc.opt.GetHotRegionCacheHitsThreshold() * - (utils.RegionHeartBeatReportInterval / utils.StoreHeartBeatReportInterval) - return sc.hotStat.RegionStats(utils.Read, threshold) -} - -// RegionWriteStats returns hot region's write stats. -// The result only includes peers that are hot enough. -func (sc *schedulingController) RegionWriteStats() map[uint64][]*statistics.HotPeerStat { - // RegionStats is a thread-safe method - return sc.hotStat.RegionStats(utils.Write, sc.opt.GetHotRegionCacheHitsThreshold()) +func (sc *schedulingController) GetHotPeerStats(rw utils.RWType) map[uint64][]*statistics.HotPeerStat { + // GetHotPeerStats is a thread-safe method + threshold := sc.opt.GetHotRegionCacheHitsThreshold() + if rw == utils.Read { + threshold = sc.opt.GetHotRegionCacheHitsThreshold() * + (utils.RegionHeartBeatReportInterval / utils.StoreHeartBeatReportInterval) + } + return sc.hotStat.GetHotPeerStats(rw, threshold) } // BucketsStats returns hot region's buckets stats.