diff --git a/.gitignore b/.gitignore index a9d942fe706..08166309a52 100644 --- a/.gitignore +++ b/.gitignore @@ -29,3 +29,4 @@ go.work* embedded_assets_handler.go *.log *.bin +third_bin diff --git a/client/client.go b/client/client.go index 3faa3a09215..9ced7284153 100644 --- a/client/client.go +++ b/client/client.go @@ -36,7 +36,6 @@ import ( "github.com/tikv/pd/client/tlsutil" "github.com/tikv/pd/client/tsoutil" "go.uber.org/zap" - "google.golang.org/grpc" ) const ( @@ -174,69 +173,6 @@ type Client interface { Close() } -// GetStoreOp represents available options when getting stores. -type GetStoreOp struct { - excludeTombstone bool -} - -// GetStoreOption configures GetStoreOp. -type GetStoreOption func(*GetStoreOp) - -// WithExcludeTombstone excludes tombstone stores from the result. -func WithExcludeTombstone() GetStoreOption { - return func(op *GetStoreOp) { op.excludeTombstone = true } -} - -// RegionsOp represents available options when operate regions -type RegionsOp struct { - group string - retryLimit uint64 - skipStoreLimit bool -} - -// RegionsOption configures RegionsOp -type RegionsOption func(op *RegionsOp) - -// WithGroup specify the group during Scatter/Split Regions -func WithGroup(group string) RegionsOption { - return func(op *RegionsOp) { op.group = group } -} - -// WithRetry specify the retry limit during Scatter/Split Regions -func WithRetry(retry uint64) RegionsOption { - return func(op *RegionsOp) { op.retryLimit = retry } -} - -// WithSkipStoreLimit specify if skip the store limit check during Scatter/Split Regions -func WithSkipStoreLimit() RegionsOption { - return func(op *RegionsOp) { op.skipStoreLimit = true } -} - -// GetRegionOp represents available options when getting regions. -type GetRegionOp struct { - needBuckets bool - allowFollowerHandle bool - outputMustContainAllKeyRange bool -} - -// GetRegionOption configures GetRegionOp. -type GetRegionOption func(op *GetRegionOp) - -// WithBuckets means getting region and its buckets. -func WithBuckets() GetRegionOption { - return func(op *GetRegionOp) { op.needBuckets = true } -} - -// WithAllowFollowerHandle means that client can send request to follower and let it handle this request. -func WithAllowFollowerHandle() GetRegionOption { - return func(op *GetRegionOp) { op.allowFollowerHandle = true } -} - -// WithOutputMustContainAllKeyRange means the output must contain all key ranges. -func WithOutputMustContainAllKeyRange() GetRegionOption { - return func(op *GetRegionOp) { op.outputMustContainAllKeyRange = true } -} - var ( // errUnmatchedClusterID is returned when found a PD with a different cluster ID. errUnmatchedClusterID = errors.New("[pd] unmatched cluster id") @@ -250,60 +186,6 @@ var ( errNoServiceModeReturned = errors.New("[pd] no service mode returned") ) -// ClientOption configures client. -type ClientOption func(c *client) - -// WithGRPCDialOptions configures the client with gRPC dial options. -func WithGRPCDialOptions(opts ...grpc.DialOption) ClientOption { - return func(c *client) { - c.option.gRPCDialOptions = append(c.option.gRPCDialOptions, opts...) - } -} - -// WithCustomTimeoutOption configures the client with timeout option. -func WithCustomTimeoutOption(timeout time.Duration) ClientOption { - return func(c *client) { - c.option.timeout = timeout - } -} - -// WithForwardingOption configures the client with forwarding option. -func WithForwardingOption(enableForwarding bool) ClientOption { - return func(c *client) { - c.option.enableForwarding = enableForwarding - } -} - -// WithTSOServerProxyOption configures the client to use TSO server proxy, -// i.e., the client will send TSO requests to the API leader (the TSO server -// proxy) which will forward the requests to the TSO servers. -func WithTSOServerProxyOption(useTSOServerProxy bool) ClientOption { - return func(c *client) { - c.option.useTSOServerProxy = useTSOServerProxy - } -} - -// WithMaxErrorRetry configures the client max retry times when connect meets error. -func WithMaxErrorRetry(count int) ClientOption { - return func(c *client) { - c.option.maxRetryTimes = count - } -} - -// WithMetricsLabels configures the client with metrics labels. -func WithMetricsLabels(labels prometheus.Labels) ClientOption { - return func(c *client) { - c.option.metricsLabels = labels - } -} - -// WithInitMetricsOption configures the client with metrics labels. -func WithInitMetricsOption(initMetrics bool) ClientOption { - return func(c *client) { - c.option.initMetrics = initMetrics - } -} - var _ Client = (*client)(nil) // serviceModeKeeper is for service mode switching. @@ -324,9 +206,7 @@ func (k *serviceModeKeeper) close() { k.tsoSvcDiscovery.Close() fallthrough case pdpb.ServiceMode_PD_SVC_MODE: - if k.tsoClient != nil { - k.tsoClient.close() - } + k.tsoClient.close() case pdpb.ServiceMode_UNKNOWN_SVC_MODE: } } @@ -560,6 +440,7 @@ func newClientWithKeyspaceName( } clientCtx, clientCancel := context.WithCancel(ctx) c := &client{ + keyspaceID: nullKeyspaceID, updateTokenConnectionCh: make(chan struct{}, 1), ctx: clientCtx, cancel: clientCancel, @@ -573,10 +454,12 @@ func newClientWithKeyspaceName( opt(c) } - updateKeyspaceIDCb := func() error { - if err := c.initRetry(c.loadKeyspaceMeta, keyspaceName); err != nil { + updateKeyspaceIDFunc := func() error { + keyspaceMeta, err := c.LoadKeyspace(clientCtx, keyspaceName) + if err != nil { return err } + c.keyspaceID = keyspaceMeta.GetId() // c.keyspaceID is the source of truth for keyspace id. c.pdSvcDiscovery.SetKeyspaceID(c.keyspaceID) return nil @@ -584,8 +467,8 @@ func newClientWithKeyspaceName( // Create a PD service discovery with null keyspace id, then query the real id with the keyspace name, // finally update the keyspace id to the PD service discovery for the following interactions. - c.pdSvcDiscovery = newPDServiceDiscovery( - clientCtx, clientCancel, &c.wg, c.setServiceMode, updateKeyspaceIDCb, nullKeyspaceID, c.svrUrls, c.tlsCfg, c.option) + c.pdSvcDiscovery = newPDServiceDiscovery(clientCtx, clientCancel, &c.wg, + c.setServiceMode, updateKeyspaceIDFunc, nullKeyspaceID, c.svrUrls, c.tlsCfg, c.option) if err := c.setup(); err != nil { c.cancel() if c.pdSvcDiscovery != nil { @@ -600,32 +483,6 @@ func newClientWithKeyspaceName( return c, nil } -func (c *client) initRetry(f func(s string) error, str string) error { - var err error - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - for i := 0; i < c.option.maxRetryTimes; i++ { - if err = f(str); err == nil { - return nil - } - select { - case <-c.ctx.Done(): - return err - case <-ticker.C: - } - } - return errors.WithStack(err) -} - -func (c *client) loadKeyspaceMeta(keyspace string) error { - keyspaceMeta, err := c.LoadKeyspace(context.TODO(), keyspace) - if err != nil { - return err - } - c.keyspaceID = keyspaceMeta.GetId() - return nil -} - func (c *client) setup() error { // Init the metrics. if c.option.initMetrics { @@ -697,7 +554,7 @@ func (c *client) resetTSOClientLocked(mode pdpb.ServiceMode) { case pdpb.ServiceMode_API_SVC_MODE: newTSOSvcDiscovery = newTSOServiceDiscovery( c.ctx, MetaStorageClient(c), c.pdSvcDiscovery, - c.GetClusterID(c.ctx), c.keyspaceID, c.tlsCfg, c.option) + c.keyspaceID, c.tlsCfg, c.option) // At this point, the keyspace group isn't known yet. Starts from the default keyspace group, // and will be updated later. newTSOCli = newTSOClient(c.ctx, c.option, diff --git a/client/option.go b/client/option.go index 3f2b7119b52..ca21dcfefbf 100644 --- a/client/option.go +++ b/client/option.go @@ -142,3 +142,120 @@ func (o *option) setTSOClientRPCConcurrency(value int) { func (o *option) getTSOClientRPCConcurrency() int { return o.dynamicOptions[TSOClientRPCConcurrency].Load().(int) } + +// GetStoreOp represents available options when getting stores. +type GetStoreOp struct { + excludeTombstone bool +} + +// GetStoreOption configures GetStoreOp. +type GetStoreOption func(*GetStoreOp) + +// WithExcludeTombstone excludes tombstone stores from the result. +func WithExcludeTombstone() GetStoreOption { + return func(op *GetStoreOp) { op.excludeTombstone = true } +} + +// RegionsOp represents available options when operate regions +type RegionsOp struct { + group string + retryLimit uint64 + skipStoreLimit bool +} + +// RegionsOption configures RegionsOp +type RegionsOption func(op *RegionsOp) + +// WithGroup specify the group during Scatter/Split Regions +func WithGroup(group string) RegionsOption { + return func(op *RegionsOp) { op.group = group } +} + +// WithRetry specify the retry limit during Scatter/Split Regions +func WithRetry(retry uint64) RegionsOption { + return func(op *RegionsOp) { op.retryLimit = retry } +} + +// WithSkipStoreLimit specify if skip the store limit check during Scatter/Split Regions +func WithSkipStoreLimit() RegionsOption { + return func(op *RegionsOp) { op.skipStoreLimit = true } +} + +// GetRegionOp represents available options when getting regions. +type GetRegionOp struct { + needBuckets bool + allowFollowerHandle bool + outputMustContainAllKeyRange bool +} + +// GetRegionOption configures GetRegionOp. +type GetRegionOption func(op *GetRegionOp) + +// WithBuckets means getting region and its buckets. +func WithBuckets() GetRegionOption { + return func(op *GetRegionOp) { op.needBuckets = true } +} + +// WithAllowFollowerHandle means that client can send request to follower and let it handle this request. +func WithAllowFollowerHandle() GetRegionOption { + return func(op *GetRegionOp) { op.allowFollowerHandle = true } +} + +// WithOutputMustContainAllKeyRange means the output must contain all key ranges. +func WithOutputMustContainAllKeyRange() GetRegionOption { + return func(op *GetRegionOp) { op.outputMustContainAllKeyRange = true } +} + +// ClientOption configures client. +type ClientOption func(c *client) + +// WithGRPCDialOptions configures the client with gRPC dial options. +func WithGRPCDialOptions(opts ...grpc.DialOption) ClientOption { + return func(c *client) { + c.option.gRPCDialOptions = append(c.option.gRPCDialOptions, opts...) + } +} + +// WithCustomTimeoutOption configures the client with timeout option. +func WithCustomTimeoutOption(timeout time.Duration) ClientOption { + return func(c *client) { + c.option.timeout = timeout + } +} + +// WithForwardingOption configures the client with forwarding option. +func WithForwardingOption(enableForwarding bool) ClientOption { + return func(c *client) { + c.option.enableForwarding = enableForwarding + } +} + +// WithTSOServerProxyOption configures the client to use TSO server proxy, +// i.e., the client will send TSO requests to the API leader (the TSO server +// proxy) which will forward the requests to the TSO servers. +func WithTSOServerProxyOption(useTSOServerProxy bool) ClientOption { + return func(c *client) { + c.option.useTSOServerProxy = useTSOServerProxy + } +} + +// WithMaxErrorRetry configures the client max retry times when connect meets error. +func WithMaxErrorRetry(count int) ClientOption { + return func(c *client) { + c.option.maxRetryTimes = count + } +} + +// WithMetricsLabels configures the client with metrics labels. +func WithMetricsLabels(labels prometheus.Labels) ClientOption { + return func(c *client) { + c.option.metricsLabels = labels + } +} + +// WithInitMetricsOption configures the client with metrics labels. +func WithInitMetricsOption(initMetrics bool) ClientOption { + return func(c *client) { + c.option.initMetrics = initMetrics + } +} diff --git a/client/pd_service_discovery.go b/client/pd_service_discovery.go index c34a5bebac6..83bc8e612a3 100644 --- a/client/pd_service_discovery.go +++ b/client/pd_service_discovery.go @@ -440,9 +440,9 @@ type pdServiceDiscovery struct { cancel context.CancelFunc closeOnce sync.Once - updateKeyspaceIDCb updateKeyspaceIDFunc - keyspaceID uint32 - tlsCfg *tls.Config + updateKeyspaceIDFunc updateKeyspaceIDFunc + keyspaceID uint32 + tlsCfg *tls.Config // Client option. option *option } @@ -461,21 +461,21 @@ func newPDServiceDiscovery( ctx context.Context, cancel context.CancelFunc, wg *sync.WaitGroup, serviceModeUpdateCb func(pdpb.ServiceMode), - updateKeyspaceIDCb updateKeyspaceIDFunc, + updateKeyspaceIDFunc updateKeyspaceIDFunc, keyspaceID uint32, urls []string, tlsCfg *tls.Config, option *option, ) *pdServiceDiscovery { pdsd := &pdServiceDiscovery{ - checkMembershipCh: make(chan struct{}, 1), - ctx: ctx, - cancel: cancel, - wg: wg, - apiCandidateNodes: [apiKindCount]*pdServiceBalancer{newPDServiceBalancer(emptyErrorFn), newPDServiceBalancer(regionAPIErrorFn)}, - serviceModeUpdateCb: serviceModeUpdateCb, - updateKeyspaceIDCb: updateKeyspaceIDCb, - keyspaceID: keyspaceID, - tlsCfg: tlsCfg, - option: option, + checkMembershipCh: make(chan struct{}, 1), + ctx: ctx, + cancel: cancel, + wg: wg, + apiCandidateNodes: [apiKindCount]*pdServiceBalancer{newPDServiceBalancer(emptyErrorFn), newPDServiceBalancer(regionAPIErrorFn)}, + serviceModeUpdateCb: serviceModeUpdateCb, + updateKeyspaceIDFunc: updateKeyspaceIDFunc, + keyspaceID: keyspaceID, + tlsCfg: tlsCfg, + option: option, } urls = addrsToURLs(urls, tlsCfg) pdsd.urls.Store(urls) @@ -500,8 +500,8 @@ func (c *pdServiceDiscovery) Init() error { // We need to update the keyspace ID before we discover and update the service mode // so that TSO in API mode can be initialized with the correct keyspace ID. - if c.updateKeyspaceIDCb != nil { - if err := c.updateKeyspaceIDCb(); err != nil { + if c.keyspaceID == nullKeyspaceID && c.updateKeyspaceIDFunc != nil { + if err := c.initRetry(c.updateKeyspaceIDFunc); err != nil { return err } } @@ -634,6 +634,9 @@ func (c *pdServiceDiscovery) checkFollowerHealth(ctx context.Context) { // Close releases all resources. func (c *pdServiceDiscovery) Close() { + if c == nil { + return + } c.closeOnce.Do(func() { log.Info("[pd] close pd service discovery client") c.clientConns.Range(func(key, cc any) bool { diff --git a/client/tso_client.go b/client/tso_client.go index f1538a7f164..6801aee3a11 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -386,7 +386,9 @@ func (c *tsoClient) tryConnectToTSO( cc *grpc.ClientConn updateAndClear = func(newURL string, connectionCtx *tsoConnectionContext) { // Only store the `connectionCtx` if it does not exist before. - connectionCtxs.LoadOrStore(newURL, connectionCtx) + if connectionCtx != nil { + connectionCtxs.LoadOrStore(newURL, connectionCtx) + } // Remove all other `connectionCtx`s. connectionCtxs.Range(func(url, cc any) bool { if url.(string) != newURL { @@ -405,6 +407,8 @@ func (c *tsoClient) tryConnectToTSO( c.svcDiscovery.ScheduleCheckMemberChanged() cc, url = c.GetTSOAllocatorClientConnByDCLocation(dc) if _, ok := connectionCtxs.Load(url); ok { + // Just trigger the clean up of the stale connection contexts. + updateAndClear(url, nil) return nil } if cc != nil { diff --git a/client/tso_service_discovery.go b/client/tso_service_discovery.go index 443d455e911..0380ddb4c28 100644 --- a/client/tso_service_discovery.go +++ b/client/tso_service_discovery.go @@ -158,7 +158,7 @@ type tsoServiceDiscovery struct { // newTSOServiceDiscovery returns a new client-side service discovery for the independent TSO service. func newTSOServiceDiscovery( ctx context.Context, metacli MetaStorageClient, apiSvcDiscovery ServiceDiscovery, - clusterID uint64, keyspaceID uint32, tlsCfg *tls.Config, option *option, + keyspaceID uint32, tlsCfg *tls.Config, option *option, ) ServiceDiscovery { ctx, cancel := context.WithCancel(ctx) c := &tsoServiceDiscovery{ @@ -166,7 +166,7 @@ func newTSOServiceDiscovery( cancel: cancel, metacli: metacli, apiSvcDiscovery: apiSvcDiscovery, - clusterID: clusterID, + clusterID: apiSvcDiscovery.GetClusterID(), tlsCfg: tlsCfg, option: option, checkMembershipCh: make(chan struct{}, 1), @@ -180,10 +180,10 @@ func newTSOServiceDiscovery( c.tsoServerDiscovery = &tsoServerDiscovery{urls: make([]string, 0)} // Start with the default keyspace group. The actual keyspace group, to which the keyspace belongs, // will be discovered later. - c.defaultDiscoveryKey = fmt.Sprintf(tsoSvcDiscoveryFormat, clusterID, defaultKeySpaceGroupID) + c.defaultDiscoveryKey = fmt.Sprintf(tsoSvcDiscoveryFormat, c.clusterID, defaultKeySpaceGroupID) log.Info("created tso service discovery", - zap.Uint64("cluster-id", clusterID), + zap.Uint64("cluster-id", c.clusterID), zap.Uint32("keyspace-id", keyspaceID), zap.String("default-discovery-key", c.defaultDiscoveryKey)) @@ -226,6 +226,9 @@ func (c *tsoServiceDiscovery) retry( // Close releases all resources func (c *tsoServiceDiscovery) Close() { + if c == nil { + return + } log.Info("closing tso service discovery") c.cancel() diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index b1abe4677df..815e04f5c54 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -68,7 +68,7 @@ func HandleOverlaps(ctx context.Context, c Cluster, overlaps []*core.RegionInfo) if c.GetRegionStats() != nil { c.GetRegionStats().ClearDefunctRegion(item.GetID()) } - c.GetLabelStats().ClearDefunctRegion(item.GetID()) + c.GetLabelStats().MarkDefunctRegion(item.GetID()) c.GetRuleManager().InvalidCache(item.GetID()) } } diff --git a/pkg/mcs/resourcemanager/server/manager.go b/pkg/mcs/resourcemanager/server/manager.go index dd7151a8b05..618b7ac6dba 100644 --- a/pkg/mcs/resourcemanager/server/manager.go +++ b/pkg/mcs/resourcemanager/server/manager.go @@ -451,7 +451,7 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) { sqlCPUCost.DeleteLabelValues(r.name, r.name, r.ruType) requestCount.DeleteLabelValues(r.name, r.name, readTypeLabel) requestCount.DeleteLabelValues(r.name, r.name, writeTypeLabel) - availableRUCounter.DeleteLabelValues(r.name, r.name, r.ruType) + availableRUCounter.DeleteLabelValues(r.name, r.name) delete(m.consumptionRecord, r) delete(maxPerSecTrackers, r.name) readRequestUnitMaxPerSecCost.DeleteLabelValues(r.name) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 4e5777665ac..03b133e0488 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -184,21 +184,18 @@ func (c *Cluster) GetHotPeerStat(rw utils.RWType, regionID, storeID uint64) *sta return c.hotStat.GetHotPeerStat(rw, regionID, storeID) } -// RegionReadStats returns hot region's read stats. +// GetHotPeerStats returns the read or write statistics for hot regions. +// It returns a map where the keys are store IDs and the values are slices of HotPeerStat. // The result only includes peers that are hot enough. -// RegionStats is a thread-safe method -func (c *Cluster) RegionReadStats() map[uint64][]*statistics.HotPeerStat { - // As read stats are reported by store heartbeat, the threshold needs to be adjusted. - threshold := c.persistConfig.GetHotRegionCacheHitsThreshold() * - (utils.RegionHeartBeatReportInterval / utils.StoreHeartBeatReportInterval) - return c.hotStat.RegionStats(utils.Read, threshold) -} - -// RegionWriteStats returns hot region's write stats. -// The result only includes peers that are hot enough. -func (c *Cluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat { - // RegionStats is a thread-safe method - return c.hotStat.RegionStats(utils.Write, c.persistConfig.GetHotRegionCacheHitsThreshold()) +// GetHotPeerStats is a thread-safe method. +func (c *Cluster) GetHotPeerStats(rw utils.RWType) map[uint64][]*statistics.HotPeerStat { + threshold := c.persistConfig.GetHotRegionCacheHitsThreshold() + if rw == utils.Read { + // As read stats are reported by store heartbeat, the threshold needs to be adjusted. + threshold = c.persistConfig.GetHotRegionCacheHitsThreshold() * + (utils.RegionHeartBeatReportInterval / utils.StoreHeartBeatReportInterval) + } + return c.hotStat.GetHotPeerStats(rw, threshold) } // BucketsStats returns hot region's buckets stats. @@ -379,13 +376,12 @@ func (c *Cluster) waitSchedulersInitialized() { } } -// TODO: implement the following methods - // UpdateRegionsLabelLevelStats updates the status of the region label level by types. func (c *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) { for _, region := range regions { c.labelStats.Observe(region, c.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), c.persistConfig.GetLocationLabels()) } + c.labelStats.ClearDefunctRegions() } func (c *Cluster) getStoresWithoutLabelLocked(region *core.RegionInfo, key, value string) []*core.StoreInfo { diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index ebbec692191..b69c035ca90 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -149,13 +149,6 @@ func (mc *Cluster) GetHotPeerStat(rw utils.RWType, regionID, storeID uint64) *st return mc.HotCache.GetHotPeerStat(rw, regionID, storeID) } -// RegionReadStats returns hot region's read stats. -// The result only includes peers that are hot enough. -func (mc *Cluster) RegionReadStats() map[uint64][]*statistics.HotPeerStat { - // We directly use threshold for read stats for mockCluster - return mc.HotCache.RegionStats(utils.Read, mc.GetHotRegionCacheHitsThreshold()) -} - // BucketsStats returns hot region's buckets stats. func (mc *Cluster) BucketsStats(degree int, regions ...uint64) map[uint64][]*buckets.BucketStat { task := buckets.NewCollectBucketStatsTask(degree, regions...) @@ -165,10 +158,11 @@ func (mc *Cluster) BucketsStats(degree int, regions ...uint64) map[uint64][]*buc return task.WaitRet(mc.ctx) } -// RegionWriteStats returns hot region's write stats. +// GetHotPeerStats returns the read or write statistics for hot regions. +// It returns a map where the keys are store IDs and the values are slices of HotPeerStat. // The result only includes peers that are hot enough. -func (mc *Cluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat { - return mc.HotCache.RegionStats(utils.Write, mc.GetHotRegionCacheHitsThreshold()) +func (mc *Cluster) GetHotPeerStats(rw utils.RWType) map[uint64][]*statistics.HotPeerStat { + return mc.HotCache.GetHotPeerStats(rw, mc.GetHotRegionCacheHitsThreshold()) } // HotRegionsFromStore picks hot regions in specify store. @@ -186,7 +180,7 @@ func (mc *Cluster) HotRegionsFromStore(store uint64, kind utils.RWType) []*core. // hotRegionsFromStore picks hot region in specify store. func hotRegionsFromStore(w *statistics.HotCache, storeID uint64, kind utils.RWType, minHotDegree int) []*statistics.HotPeerStat { - if stats, ok := w.RegionStats(kind, minHotDegree)[storeID]; ok && len(stats) > 0 { + if stats, ok := w.GetHotPeerStats(kind, minHotDegree)[storeID]; ok && len(stats) > 0 { return stats } return nil diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 2736c687fdb..344621c8b5b 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -432,16 +432,8 @@ func (c *Coordinator) GetHotRegionsByType(typ utils.RWType) *statistics.StoreHot isTraceFlow := c.cluster.GetSchedulerConfig().IsTraceRegionFlow() storeLoads := c.cluster.GetStoresLoads() stores := c.cluster.GetStores() - var infos *statistics.StoreHotPeersInfos - switch typ { - case utils.Write: - regionStats := c.cluster.RegionWriteStats() - infos = statistics.GetHotStatus(stores, storeLoads, regionStats, utils.Write, isTraceFlow) - case utils.Read: - regionStats := c.cluster.RegionReadStats() - infos = statistics.GetHotStatus(stores, storeLoads, regionStats, utils.Read, isTraceFlow) - default: - } + hotPeerStats := c.cluster.GetHotPeerStats(typ) + infos := statistics.GetHotStatus(stores, storeLoads, hotPeerStats, typ, isTraceFlow) // update params `IsLearner` and `LastUpdateTime` s := []statistics.StoreHotPeersStat{infos.AsLeader, infos.AsPeer} for i, stores := range s { @@ -505,20 +497,9 @@ func (c *Coordinator) CollectHotSpotMetrics() { } func collectHotMetrics(cluster sche.ClusterInformer, stores []*core.StoreInfo, typ utils.RWType) { - var ( - kind string - regionStats map[uint64][]*statistics.HotPeerStat - ) - - switch typ { - case utils.Read: - regionStats = cluster.RegionReadStats() - kind = utils.Read.String() - case utils.Write: - regionStats = cluster.RegionWriteStats() - kind = utils.Write.String() - } - status := statistics.CollectHotPeerInfos(stores, regionStats) // only returns TotalBytesRate,TotalKeysRate,TotalQueryRate,Count + kind := typ.String() + hotPeerStats := cluster.GetHotPeerStats(typ) + status := statistics.CollectHotPeerInfos(stores, hotPeerStats) // only returns TotalBytesRate,TotalKeysRate,TotalQueryRate,Count for _, s := range stores { // TODO: pre-allocate gauge metrics diff --git a/pkg/schedule/labeler/labeler_test.go b/pkg/schedule/labeler/labeler_test.go index bd51bab7d83..34490a7249d 100644 --- a/pkg/schedule/labeler/labeler_test.go +++ b/pkg/schedule/labeler/labeler_test.go @@ -341,7 +341,7 @@ func TestKeyRange(t *testing.T) { func TestLabelerRuleTTL(t *testing.T) { re := require.New(t) store := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil) - labeler, err := NewRegionLabeler(context.Background(), store, time.Millisecond*10) + labeler, err := NewRegionLabeler(context.Background(), store, time.Minute) re.NoError(err) rules := []*LabelRule{ { diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index eedbcfe4625..97a558c3fe4 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -130,7 +130,7 @@ func (s *baseHotScheduler) prepareForBalance(typ resourceType, cluster sche.Sche // update read statistics // avoid to update read statistics frequently if time.Since(s.updateReadTime) >= statisticsInterval { - regionRead := cluster.RegionReadStats() + regionRead := cluster.GetHotPeerStats(utils.Read) prepare(regionRead, utils.Read, constant.LeaderKind) prepare(regionRead, utils.Read, constant.RegionKind) s.updateReadTime = time.Now() @@ -139,7 +139,7 @@ func (s *baseHotScheduler) prepareForBalance(typ resourceType, cluster sche.Sche // update write statistics // avoid to update write statistics frequently if time.Since(s.updateWriteTime) >= statisticsInterval { - regionWrite := cluster.RegionWriteStats() + regionWrite := cluster.GetHotPeerStats(utils.Write) prepare(regionWrite, utils.Write, constant.LeaderKind) prepare(regionWrite, utils.Write, constant.RegionKind) s.updateWriteTime = time.Now() diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index 0f2be699154..d8da20d8111 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -1297,7 +1297,7 @@ func TestHotReadRegionScheduleByteRateOnly(t *testing.T) { r := tc.HotRegionsFromStore(2, utils.Read) re.Len(r, 3) // check hot items - stats := tc.HotCache.RegionStats(utils.Read, 0) + stats := tc.HotCache.GetHotPeerStats(utils.Read, 0) re.Len(stats, 3) for _, ss := range stats { for _, s := range ss { @@ -1626,7 +1626,7 @@ func TestHotCacheUpdateCache(t *testing.T) { // lower than hot read flow rate, but higher than write flow rate {11, []uint64{1, 2, 3}, 7 * units.KiB, 0, 0}, }) - stats := tc.RegionStats(utils.Read, 0) + stats := tc.HotCache.GetHotPeerStats(utils.Read, 0) re.Len(stats[1], 3) re.Len(stats[2], 3) re.Len(stats[3], 3) @@ -1635,7 +1635,7 @@ func TestHotCacheUpdateCache(t *testing.T) { {3, []uint64{2, 1, 3}, 20 * units.KiB, 0, 0}, {11, []uint64{1, 2, 3}, 7 * units.KiB, 0, 0}, }) - stats = tc.RegionStats(utils.Read, 0) + stats = tc.HotCache.GetHotPeerStats(utils.Read, 0) re.Len(stats[1], 3) re.Len(stats[2], 3) re.Len(stats[3], 3) @@ -1645,7 +1645,7 @@ func TestHotCacheUpdateCache(t *testing.T) { {5, []uint64{1, 2, 3}, 20 * units.KiB, 0, 0}, {6, []uint64{1, 2, 3}, 0.8 * units.KiB, 0, 0}, }) - stats = tc.RegionStats(utils.Write, 0) + stats = tc.HotCache.GetHotPeerStats(utils.Write, 0) re.Len(stats[1], 2) re.Len(stats[2], 2) re.Len(stats[3], 2) @@ -1653,7 +1653,7 @@ func TestHotCacheUpdateCache(t *testing.T) { addRegionInfo(tc, utils.Write, []testRegionInfo{ {5, []uint64{1, 2, 5}, 20 * units.KiB, 0, 0}, }) - stats = tc.RegionStats(utils.Write, 0) + stats = tc.HotCache.GetHotPeerStats(utils.Write, 0) re.Len(stats[1], 2) re.Len(stats[2], 2) @@ -1668,7 +1668,7 @@ func TestHotCacheUpdateCache(t *testing.T) { // lower than hot read flow rate, but higher than write flow rate {31, []uint64{4, 5, 6}, 7 * units.KiB, 0, 0}, }) - stats = tc.RegionStats(utils.Read, 0) + stats = tc.HotCache.GetHotPeerStats(utils.Read, 0) re.Len(stats[4], 2) re.Len(stats[5], 1) re.Empty(stats[6]) @@ -1690,13 +1690,13 @@ func TestHotCacheKeyThresholds(t *testing.T) { {1, []uint64{1, 2, 3}, 0, 1, 0}, {2, []uint64{1, 2, 3}, 0, 1 * units.KiB, 0}, }) - stats := tc.RegionStats(utils.Read, 0) + stats := tc.HotCache.GetHotPeerStats(utils.Read, 0) re.Len(stats[1], 1) addRegionInfo(tc, utils.Write, []testRegionInfo{ {3, []uint64{4, 5, 6}, 0, 1, 0}, {4, []uint64{4, 5, 6}, 0, 1 * units.KiB, 0}, }) - stats = tc.RegionStats(utils.Write, 0) + stats = tc.HotCache.GetHotPeerStats(utils.Write, 0) re.Len(stats[4], 1) re.Len(stats[5], 1) re.Len(stats[6], 1) @@ -1725,7 +1725,7 @@ func TestHotCacheKeyThresholds(t *testing.T) { { // read addRegionInfo(tc, utils.Read, regions) - stats := tc.RegionStats(utils.Read, 0) + stats := tc.HotCache.GetHotPeerStats(utils.Read, 0) re.Greater(len(stats[1]), 500) // for AntiCount @@ -1733,12 +1733,12 @@ func TestHotCacheKeyThresholds(t *testing.T) { addRegionInfo(tc, utils.Read, regions) addRegionInfo(tc, utils.Read, regions) addRegionInfo(tc, utils.Read, regions) - stats = tc.RegionStats(utils.Read, 0) + stats = tc.HotCache.GetHotPeerStats(utils.Read, 0) re.Len(stats[1], 500) } { // write addRegionInfo(tc, utils.Write, regions) - stats := tc.RegionStats(utils.Write, 0) + stats := tc.HotCache.GetHotPeerStats(utils.Write, 0) re.Greater(len(stats[1]), 500) re.Greater(len(stats[2]), 500) re.Greater(len(stats[3]), 500) @@ -1748,7 +1748,7 @@ func TestHotCacheKeyThresholds(t *testing.T) { addRegionInfo(tc, utils.Write, regions) addRegionInfo(tc, utils.Write, regions) addRegionInfo(tc, utils.Write, regions) - stats = tc.RegionStats(utils.Write, 0) + stats = tc.HotCache.GetHotPeerStats(utils.Write, 0) re.Len(stats[1], 500) re.Len(stats[2], 500) re.Len(stats[3], 500) @@ -1778,7 +1778,7 @@ func TestHotCacheByteAndKey(t *testing.T) { } { // read addRegionInfo(tc, utils.Read, regions) - stats := tc.RegionStats(utils.Read, 0) + stats := tc.HotCache.GetHotPeerStats(utils.Read, 0) re.Len(stats[1], 500) addRegionInfo(tc, utils.Read, []testRegionInfo{ @@ -1787,12 +1787,12 @@ func TestHotCacheByteAndKey(t *testing.T) { {10003, []uint64{1, 2, 3}, 10 * units.KiB, 500 * units.KiB, 0}, {10004, []uint64{1, 2, 3}, 500 * units.KiB, 500 * units.KiB, 0}, }) - stats = tc.RegionStats(utils.Read, 0) + stats = tc.HotCache.GetHotPeerStats(utils.Read, 0) re.Len(stats[1], 503) } { // write addRegionInfo(tc, utils.Write, regions) - stats := tc.RegionStats(utils.Write, 0) + stats := tc.HotCache.GetHotPeerStats(utils.Write, 0) re.Len(stats[1], 500) re.Len(stats[2], 500) re.Len(stats[3], 500) @@ -1802,7 +1802,7 @@ func TestHotCacheByteAndKey(t *testing.T) { {10003, []uint64{1, 2, 3}, 10 * units.KiB, 500 * units.KiB, 0}, {10004, []uint64{1, 2, 3}, 500 * units.KiB, 500 * units.KiB, 0}, }) - stats = tc.RegionStats(utils.Write, 0) + stats = tc.HotCache.GetHotPeerStats(utils.Write, 0) re.Len(stats[1], 503) re.Len(stats[2], 503) re.Len(stats[3], 503) diff --git a/pkg/statistics/hot_cache.go b/pkg/statistics/hot_cache.go index 046313b4d1d..ae61063646d 100644 --- a/pkg/statistics/hot_cache.go +++ b/pkg/statistics/hot_cache.go @@ -76,11 +76,12 @@ func (w *HotCache) CheckReadAsync(task func(cache *HotPeerCache)) bool { } } -// RegionStats returns hot items according to kind -func (w *HotCache) RegionStats(kind utils.RWType, minHotDegree int) map[uint64][]*HotPeerStat { +// RegionStats returns the read or write statistics for hot regions. +// It returns a map where the keys are store IDs and the values are slices of HotPeerStat. +func (w *HotCache) GetHotPeerStats(kind utils.RWType, minHotDegree int) map[uint64][]*HotPeerStat { ret := make(chan map[uint64][]*HotPeerStat, 1) collectRegionStatsTask := func(cache *HotPeerCache) { - ret <- cache.PeerStats(minHotDegree) + ret <- cache.GetHotPeerStats(minHotDegree) } var succ bool switch kind { diff --git a/pkg/statistics/hot_peer_cache.go b/pkg/statistics/hot_peer_cache.go index 90b106394c1..19fc586e6e6 100644 --- a/pkg/statistics/hot_peer_cache.go +++ b/pkg/statistics/hot_peer_cache.go @@ -86,8 +86,9 @@ func NewHotPeerCache(ctx context.Context, cluster *core.BasicCluster, kind utils } } -// PeerStats returns hot items -func (f *HotPeerCache) PeerStats(minHotDegree int) map[uint64][]*HotPeerStat { +// GetHotPeerStats returns the read or write statistics for hot regions. +// It returns a map where the keys are store IDs and the values are slices of HotPeerStat. +func (f *HotPeerCache) GetHotPeerStats(minHotDegree int) map[uint64][]*HotPeerStat { res := make(map[uint64][]*HotPeerStat) defaultAntiCount := f.kind.DefaultAntiCount() for storeID, peers := range f.peersOfStore { diff --git a/pkg/statistics/region_collection.go b/pkg/statistics/region_collection.go index 30197dd43ea..7e51a8a7bdd 100644 --- a/pkg/statistics/region_collection.go +++ b/pkg/statistics/region_collection.go @@ -365,6 +365,7 @@ type LabelStatistics struct { syncutil.RWMutex regionLabelStats map[uint64]string labelCounter map[string]int + defunctRegions map[uint64]struct{} } // NewLabelStatistics creates a new LabelStatistics. @@ -372,6 +373,7 @@ func NewLabelStatistics() *LabelStatistics { return &LabelStatistics{ regionLabelStats: make(map[uint64]string), labelCounter: make(map[string]int), + defunctRegions: make(map[uint64]struct{}), } } @@ -405,14 +407,26 @@ func ResetLabelStatsMetrics() { regionLabelLevelGauge.Reset() } -// ClearDefunctRegion is used to handle the overlap region. -func (l *LabelStatistics) ClearDefunctRegion(regionID uint64) { +// MarkDefunctRegion is used to handle the overlap region. +// It is used to mark the region as defunct and remove it from the label statistics later. +func (l *LabelStatistics) MarkDefunctRegion(regionID uint64) { l.Lock() defer l.Unlock() - if label, ok := l.regionLabelStats[regionID]; ok { - l.labelCounter[label]-- - delete(l.regionLabelStats, regionID) + l.defunctRegions[regionID] = struct{}{} +} + +// ClearDefunctRegions is used to handle the overlap region. +// It is used to remove the defunct regions from the label statistics. +func (l *LabelStatistics) ClearDefunctRegions() { + l.Lock() + defer l.Unlock() + for regionID := range l.defunctRegions { + if label, ok := l.regionLabelStats[regionID]; ok { + l.labelCounter[label]-- + delete(l.regionLabelStats, regionID) + } } + l.defunctRegions = make(map[uint64]struct{}) } // GetLabelCounter is only used for tests. diff --git a/pkg/statistics/region_stat_informer.go b/pkg/statistics/region_stat_informer.go index 4fec5b4aacf..c91ba7be317 100644 --- a/pkg/statistics/region_stat_informer.go +++ b/pkg/statistics/region_stat_informer.go @@ -23,10 +23,8 @@ import ( type RegionStatInformer interface { GetHotPeerStat(rw utils.RWType, regionID, storeID uint64) *HotPeerStat IsRegionHot(region *core.RegionInfo) bool - // RegionWriteStats return the storeID -> write stat of peers on this store. + // GetHotPeerStats return the read or write statistics for hot regions. + // It returns a map where the keys are store IDs and the values are slices of HotPeerStat. // The result only includes peers that are hot enough. - RegionWriteStats() map[uint64][]*HotPeerStat - // RegionReadStats return the storeID -> read stat of peers on this store. - // The result only includes peers that are hot enough. - RegionReadStats() map[uint64][]*HotPeerStat + GetHotPeerStats(rw utils.RWType) map[uint64][]*HotPeerStat } diff --git a/pkg/syncer/client.go b/pkg/syncer/client.go index a94f5c41f3f..bf7f91cfc60 100644 --- a/pkg/syncer/client.go +++ b/pkg/syncer/client.go @@ -104,9 +104,10 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { log.Info("region syncer start load region") start := time.Now() err := storage.TryLoadRegionsOnce(ctx, regionStorage, bc.CheckAndPutRegion) - log.Info("region syncer finished load regions", zap.Duration("time-cost", time.Since(start))) if err != nil { - log.Warn("failed to load regions", errs.ZapError(err)) + log.Warn("region syncer failed to load regions", errs.ZapError(err), zap.Duration("time-cost", time.Since(start))) + } else { + log.Info("region syncer finished load regions", zap.Duration("time-cost", time.Since(start))) } // establish client. conn := grpcutil.CreateClientConn(ctx, addr, s.tlsConfig, diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index 56ee8313d57..a02d4884e17 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -202,7 +202,6 @@ func NewAllocatorManager( rootPath string, storage endpoint.TSOStorage, cfg Config, - startGlobalLeaderLoop bool, ) *AllocatorManager { ctx, cancel := context.WithCancel(ctx) am := &AllocatorManager{ @@ -224,7 +223,7 @@ func NewAllocatorManager( am.localAllocatorConn.clientConns = make(map[string]*grpc.ClientConn) // Set up the Global TSO Allocator here, it will be initialized once the member campaigns leader successfully. - am.SetUpGlobalAllocator(am.ctx, am.member.GetLeadership(), startGlobalLeaderLoop) + am.SetUpGlobalAllocator(am.ctx, am.member.GetLeadership()) am.svcLoopWG.Add(1) go am.tsoAllocatorLoop() @@ -234,11 +233,11 @@ func NewAllocatorManager( // SetUpGlobalAllocator is used to set up the global allocator, which will initialize the allocator and put it into // an allocator daemon. An TSO Allocator should only be set once, and may be initialized and reset multiple times // depending on the election. -func (am *AllocatorManager) SetUpGlobalAllocator(ctx context.Context, leadership *election.Leadership, startGlobalLeaderLoop bool) { +func (am *AllocatorManager) SetUpGlobalAllocator(ctx context.Context, leadership *election.Leadership) { am.mu.Lock() defer am.mu.Unlock() - allocator := NewGlobalTSOAllocator(ctx, am, startGlobalLeaderLoop) + allocator := NewGlobalTSOAllocator(ctx, am) // Create a new allocatorGroup ctx, cancel := context.WithCancel(ctx) am.mu.allocatorGroups[GlobalDCLocation] = &allocatorGroup{ @@ -1389,3 +1388,14 @@ func (am *AllocatorManager) GetLeaderAddr() string { } return leaderAddrs[0] } + +func (am *AllocatorManager) startGlobalAllocatorLoop() { + globalTSOAllocator, ok := am.mu.allocatorGroups[GlobalDCLocation].allocator.(*GlobalTSOAllocator) + if !ok { + // it should never happen + log.Error("failed to start global allocator loop, global allocator not found") + return + } + globalTSOAllocator.wg.Add(1) + go globalTSOAllocator.primaryElectionLoop() +} diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index 53a6f65a25d..5c7c905089c 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -97,7 +97,6 @@ type GlobalTSOAllocator struct { func NewGlobalTSOAllocator( ctx context.Context, am *AllocatorManager, - startGlobalLeaderLoop bool, ) Allocator { ctx, cancel := context.WithCancel(ctx) gta := &GlobalTSOAllocator{ @@ -109,11 +108,6 @@ func NewGlobalTSOAllocator( tsoAllocatorRoleGauge: tsoAllocatorRole.WithLabelValues(am.getGroupIDStr(), GlobalDCLocation), } - if startGlobalLeaderLoop { - gta.wg.Add(1) - go gta.primaryElectionLoop() - } - return gta } @@ -537,6 +531,8 @@ func (gta *GlobalTSOAllocator) Reset() { gta.timestampOracle.ResetTimestamp() } +// primaryElectionLoop is used to maintain the TSO primary election and TSO's +// running allocator. It is only used in API mode. func (gta *GlobalTSOAllocator) primaryElectionLoop() { defer logutil.LogPanic() defer gta.wg.Done() diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 7c1d3426ce9..c19d790efc5 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -790,7 +790,8 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro storage = kgm.tsoSvcStorage } // Initialize all kinds of maps. - am := NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg, true) + am := NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg) + am.startGlobalAllocatorLoop() log.Info("created allocator manager", zap.Uint32("keyspace-group-id", group.ID), zap.String("timestamp-path", am.GetTimestampPath(""))) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 4cce39fa093..3869308d9dc 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -17,6 +17,7 @@ package cluster import ( "context" "encoding/json" + errorspkg "errors" "fmt" "io" "math" @@ -43,6 +44,7 @@ import ( "github.com/tikv/pd/pkg/keyspace" "github.com/tikv/pd/pkg/mcs/discovery" "github.com/tikv/pd/pkg/mcs/utils/constant" + "github.com/tikv/pd/pkg/member" "github.com/tikv/pd/pkg/memory" "github.com/tikv/pd/pkg/progress" "github.com/tikv/pd/pkg/ratelimit" @@ -56,6 +58,7 @@ import ( "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/syncer" + "github.com/tikv/pd/pkg/tso" "github.com/tikv/pd/pkg/unsaferecovery" "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/keypath" @@ -88,12 +91,13 @@ const ( // nodeStateCheckJobInterval is the interval to run node state check job. nodeStateCheckJobInterval = 10 * time.Second // metricsCollectionJobInterval is the interval to run metrics collection job. - metricsCollectionJobInterval = 10 * time.Second - updateStoreStatsInterval = 9 * time.Millisecond - clientTimeout = 3 * time.Second - defaultChangedRegionsLimit = 10000 - gcTombstoneInterval = 30 * 24 * time.Hour - serviceCheckInterval = 10 * time.Second + metricsCollectionJobInterval = 10 * time.Second + updateStoreStatsInterval = 9 * time.Millisecond + clientTimeout = 3 * time.Second + defaultChangedRegionsLimit = 10000 + gcTombstoneInterval = 30 * 24 * time.Hour + schedulingServiceCheckInterval = 10 * time.Second + tsoServiceCheckInterval = 100 * time.Millisecond // persistLimitRetryTimes is used to reduce the probability of the persistent error // since the once the store is added or removed, we shouldn't return an error even if the store limit is failed to persist. persistLimitRetryTimes = 5 @@ -144,6 +148,7 @@ type RaftCluster struct { cancel context.CancelFunc *core.BasicCluster // cached cluster info + member *member.EmbeddedEtcdMember etcdClient *clientv3.Client httpClient *http.Client @@ -174,6 +179,7 @@ type RaftCluster struct { keyspaceGroupManager *keyspace.GroupManager independentServices sync.Map hbstreams *hbstream.HeartbeatStreams + tsoAllocator *tso.AllocatorManager // heartbeatRunner is used to process the subtree update task asynchronously. heartbeatRunner ratelimit.Runner @@ -194,16 +200,18 @@ type Status struct { } // NewRaftCluster create a new cluster. -func NewRaftCluster(ctx context.Context, clusterID uint64, basicCluster *core.BasicCluster, storage storage.Storage, regionSyncer *syncer.RegionSyncer, etcdClient *clientv3.Client, - httpClient *http.Client) *RaftCluster { +func NewRaftCluster(ctx context.Context, clusterID uint64, member *member.EmbeddedEtcdMember, basicCluster *core.BasicCluster, storage storage.Storage, regionSyncer *syncer.RegionSyncer, etcdClient *clientv3.Client, + httpClient *http.Client, tsoAllocator *tso.AllocatorManager) *RaftCluster { return &RaftCluster{ serverCtx: ctx, clusterID: clusterID, + member: member, regionSyncer: regionSyncer, httpClient: httpClient, etcdClient: etcdClient, BasicCluster: basicCluster, storage: storage, + tsoAllocator: tsoAllocator, heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), miscRunner: ratelimit.NewConcurrentRunner(miscTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), @@ -314,11 +322,13 @@ func (c *RaftCluster) Start(s Server) error { if err != nil { return err } + c.checkTSOService() cluster, err := c.LoadClusterInfo() if err != nil { return err } if cluster == nil { + log.Warn("cluster is not bootstrapped") return nil } @@ -351,7 +361,7 @@ func (c *RaftCluster) Start(s Server) error { return err } } - c.checkServices() + c.checkSchedulingService() c.wg.Add(9) go c.runServiceCheckJob() go c.runMetricsCollectionJob() @@ -370,7 +380,7 @@ func (c *RaftCluster) Start(s Server) error { return nil } -func (c *RaftCluster) checkServices() { +func (c *RaftCluster) checkSchedulingService() { if c.isAPIServiceMode { servers, err := discovery.Discover(c.etcdClient, strconv.FormatUint(c.clusterID, 10), constant.SchedulingServiceName) if c.opt.GetMicroServiceConfig().IsSchedulingFallbackEnabled() && (err != nil || len(servers) == 0) { @@ -390,25 +400,76 @@ func (c *RaftCluster) checkServices() { } } +// checkTSOService checks the TSO service. +func (c *RaftCluster) checkTSOService() { + if c.isAPIServiceMode { + return + } + + if err := c.startTSOJobs(); err != nil { + // If there is an error, need to wait for the next check. + log.Error("failed to start TSO jobs", errs.ZapError(err)) + return + } +} + func (c *RaftCluster) runServiceCheckJob() { defer logutil.LogPanic() defer c.wg.Done() - ticker := time.NewTicker(serviceCheckInterval) + schedulingTicker := time.NewTicker(schedulingServiceCheckInterval) failpoint.Inject("highFrequencyClusterJobs", func() { - ticker.Reset(time.Millisecond) + schedulingTicker.Reset(time.Millisecond) }) - defer ticker.Stop() + defer schedulingTicker.Stop() for { select { case <-c.ctx.Done(): log.Info("service check job is stopped") return - case <-ticker.C: - c.checkServices() + case <-schedulingTicker.C: + c.checkSchedulingService() + } + } +} + +func (c *RaftCluster) startTSOJobs() error { + allocator, err := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation) + if err != nil { + log.Error("failed to get global TSO allocator", errs.ZapError(err)) + return err + } + if !allocator.IsInitialize() { + log.Info("initializing the global TSO allocator") + if err := allocator.Initialize(0); err != nil { + log.Error("failed to initialize the global TSO allocator", errs.ZapError(err)) + return err } } + return nil +} + +func (c *RaftCluster) stopTSOJobs() error { + allocator, err := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation) + if err != nil { + log.Error("failed to get global TSO allocator", errs.ZapError(err)) + return err + } + if allocator.IsInitialize() { + c.tsoAllocator.ResetAllocatorGroup(tso.GlobalDCLocation, true) + failpoint.Inject("updateAfterResetTSO", func() { + allocator, _ := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation) + if err := allocator.UpdateTSO(); !errorspkg.Is(err, errs.ErrUpdateTimestamp) { + log.Panic("the tso update after reset should return ErrUpdateTimestamp as expected", zap.Error(err)) + } + if allocator.IsInitialize() { + log.Panic("the allocator should be uninitialized after reset") + } + }) + } + + return nil } // startGCTuner @@ -757,6 +818,9 @@ func (c *RaftCluster) Stop() { if !c.IsServiceIndependent(constant.SchedulingServiceName) { c.stopSchedulingJobs() } + if err := c.stopTSOJobs(); err != nil { + log.Error("failed to stop tso jobs", errs.ZapError(err)) + } c.heartbeatRunner.Stop() c.miscRunner.Stop() c.logRunner.Stop() diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 94456f236f6..751e2664a5f 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -161,7 +161,7 @@ func TestStoreHeartbeat(t *testing.T) { re.NoError(cluster.HandleStoreHeartbeat(hotReq, hotResp)) re.Equal("v1", cluster.GetStore(1).GetStoreLimit().Version()) time.Sleep(20 * time.Millisecond) - storeStats := cluster.hotStat.RegionStats(utils.Read, 3) + storeStats := cluster.hotStat.GetHotPeerStats(utils.Read, 3) re.Len(storeStats[1], 1) re.Equal(uint64(1), storeStats[1][0].RegionID) interval := float64(hotHeartBeat.Interval.EndTimestamp - hotHeartBeat.Interval.StartTimestamp) @@ -169,15 +169,15 @@ func TestStoreHeartbeat(t *testing.T) { re.Equal(float64(hotHeartBeat.PeerStats[0].ReadBytes)/interval, storeStats[1][0].Loads[utils.ByteDim]) re.Equal(float64(hotHeartBeat.PeerStats[0].ReadKeys)/interval, storeStats[1][0].Loads[utils.KeyDim]) re.Equal(float64(hotHeartBeat.PeerStats[0].QueryStats.Get)/interval, storeStats[1][0].Loads[utils.QueryDim]) - // After cold heartbeat, we won't find region 1 peer in regionStats + // After cold heartbeat, we won't find region 1 peer in HotPeerStats re.NoError(cluster.HandleStoreHeartbeat(coldReq, coldResp)) time.Sleep(20 * time.Millisecond) - storeStats = cluster.hotStat.RegionStats(utils.Read, 1) + storeStats = cluster.hotStat.GetHotPeerStats(utils.Read, 1) re.Empty(storeStats[1]) // After hot heartbeat, we can find region 1 peer again re.NoError(cluster.HandleStoreHeartbeat(hotReq, hotResp)) time.Sleep(20 * time.Millisecond) - storeStats = cluster.hotStat.RegionStats(utils.Read, 3) + storeStats = cluster.hotStat.GetHotPeerStats(utils.Read, 3) re.Len(storeStats[1], 1) re.Equal(uint64(1), storeStats[1][0].RegionID) // after several cold heartbeats, and one hot heartbeat, we also can't find region 1 peer @@ -185,19 +185,19 @@ func TestStoreHeartbeat(t *testing.T) { re.NoError(cluster.HandleStoreHeartbeat(coldReq, coldResp)) re.NoError(cluster.HandleStoreHeartbeat(coldReq, coldResp)) time.Sleep(20 * time.Millisecond) - storeStats = cluster.hotStat.RegionStats(utils.Read, 0) + storeStats = cluster.hotStat.GetHotPeerStats(utils.Read, 0) re.Empty(storeStats[1]) re.NoError(cluster.HandleStoreHeartbeat(hotReq, hotResp)) time.Sleep(20 * time.Millisecond) - storeStats = cluster.hotStat.RegionStats(utils.Read, 1) + storeStats = cluster.hotStat.GetHotPeerStats(utils.Read, 1) re.Empty(storeStats[1]) - storeStats = cluster.hotStat.RegionStats(utils.Read, 3) + storeStats = cluster.hotStat.GetHotPeerStats(utils.Read, 3) re.Empty(storeStats[1]) // after 2 hot heartbeats, wo can find region 1 peer again re.NoError(cluster.HandleStoreHeartbeat(hotReq, hotResp)) re.NoError(cluster.HandleStoreHeartbeat(hotReq, hotResp)) time.Sleep(20 * time.Millisecond) - storeStats = cluster.hotStat.RegionStats(utils.Read, 3) + storeStats = cluster.hotStat.GetHotPeerStats(utils.Read, 3) re.Len(storeStats[1], 1) re.Equal(uint64(1), storeStats[1][0].RegionID) } @@ -645,7 +645,7 @@ func TestRegionHeartbeatHotStat(t *testing.T) { re.NoError(err) // wait HotStat to update items time.Sleep(time.Second) - stats := cluster.hotStat.RegionStats(utils.Write, 0) + stats := cluster.hotStat.GetHotPeerStats(utils.Write, 0) re.Len(stats[1], 1) re.Len(stats[2], 1) re.Len(stats[3], 1) @@ -658,7 +658,7 @@ func TestRegionHeartbeatHotStat(t *testing.T) { re.NoError(err) // wait HotStat to update items time.Sleep(time.Second) - stats = cluster.hotStat.RegionStats(utils.Write, 0) + stats = cluster.hotStat.GetHotPeerStats(utils.Write, 0) re.Len(stats[1], 1) re.Empty(stats[2]) re.Len(stats[3], 1) @@ -1128,6 +1128,7 @@ func TestRegionLabelIsolationLevel(t *testing.T) { opt.SetReplicationConfig(cfg) re.NoError(err) cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend()) + cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) for i := uint64(1); i <= 4; i++ { var labels []*metapb.StoreLabel @@ -1162,13 +1163,42 @@ func TestRegionLabelIsolationLevel(t *testing.T) { StartKey: []byte{byte(1)}, EndKey: []byte{byte(2)}, } - r := core.NewRegionInfo(region, peers[0]) - re.NoError(cluster.putRegion(r)) + r1 := core.NewRegionInfo(region, peers[0]) + re.NoError(cluster.putRegion(r1)) - cluster.UpdateRegionsLabelLevelStats([]*core.RegionInfo{r}) + cluster.UpdateRegionsLabelLevelStats([]*core.RegionInfo{r1}) counter := cluster.labelStats.GetLabelCounter() re.Equal(0, counter["none"]) re.Equal(1, counter["zone"]) + + region = &metapb.Region{ + Id: 10, + Peers: peers, + StartKey: []byte{byte(2)}, + EndKey: []byte{byte(3)}, + } + r2 := core.NewRegionInfo(region, peers[0]) + re.NoError(cluster.putRegion(r2)) + + cluster.UpdateRegionsLabelLevelStats([]*core.RegionInfo{r2}) + counter = cluster.labelStats.GetLabelCounter() + re.Equal(0, counter["none"]) + re.Equal(2, counter["zone"]) + + // issue: https://github.com/tikv/pd/issues/8700 + // step1: heartbeat a overlap region, which is used to simulate the case that the region is merged. + // step2: update region 9 and region 10, which is used to simulate the case that patrol is triggered. + // We should only count region 9. + overlapRegion := r1.Clone( + core.WithStartKey(r1.GetStartKey()), + core.WithEndKey(r2.GetEndKey()), + core.WithLeader(r2.GetPeer(8)), + ) + re.NoError(cluster.HandleRegionHeartbeat(overlapRegion)) + cluster.UpdateRegionsLabelLevelStats([]*core.RegionInfo{r1, r2}) + counter = cluster.labelStats.GetLabelCounter() + re.Equal(0, counter["none"]) + re.Equal(1, counter["zone"]) } func heartbeatRegions(re *require.Assertions, cluster *RaftCluster, regions []*core.RegionInfo) { @@ -2566,7 +2596,7 @@ func TestCollectMetrics(t *testing.T) { rc.collectSchedulingMetrics() } stores := co.GetCluster().GetStores() - regionStats := co.GetCluster().RegionWriteStats() + regionStats := co.GetCluster().GetHotPeerStats(utils.Write) status1 := statistics.CollectHotPeerInfos(stores, regionStats) status2 := statistics.GetHotStatus(stores, co.GetCluster().GetStoresLoads(), regionStats, utils.Write, co.GetCluster().GetSchedulerConfig().IsTraceRegionFlow()) for _, s := range status2.AsLeader { diff --git a/server/cluster/scheduling_controller.go b/server/cluster/scheduling_controller.go index 794304999ab..b182410c016 100644 --- a/server/cluster/scheduling_controller.go +++ b/server/cluster/scheduling_controller.go @@ -236,6 +236,7 @@ func (sc *schedulingController) UpdateRegionsLabelLevelStats(regions []*core.Reg for _, region := range regions { sc.labelStats.Observe(region, sc.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), sc.opt.GetLocationLabels()) } + sc.labelStats.ClearDefunctRegions() } func (sc *schedulingController) getStoresWithoutLabelLocked(region *core.RegionInfo, key, value string) []*core.StoreInfo { @@ -269,21 +270,17 @@ func (sc *schedulingController) GetHotPeerStat(rw utils.RWType, regionID, storeI return sc.hotStat.GetHotPeerStat(rw, regionID, storeID) } -// RegionReadStats returns hot region's read stats. +// GetHotPeerStats returns the read or write statistics for hot regions. +// It returns a map where the keys are store IDs and the values are slices of HotPeerStat. // The result only includes peers that are hot enough. -// RegionStats is a thread-safe method -func (sc *schedulingController) RegionReadStats() map[uint64][]*statistics.HotPeerStat { - // As read stats are reported by store heartbeat, the threshold needs to be adjusted. - threshold := sc.opt.GetHotRegionCacheHitsThreshold() * - (utils.RegionHeartBeatReportInterval / utils.StoreHeartBeatReportInterval) - return sc.hotStat.RegionStats(utils.Read, threshold) -} - -// RegionWriteStats returns hot region's write stats. -// The result only includes peers that are hot enough. -func (sc *schedulingController) RegionWriteStats() map[uint64][]*statistics.HotPeerStat { - // RegionStats is a thread-safe method - return sc.hotStat.RegionStats(utils.Write, sc.opt.GetHotRegionCacheHitsThreshold()) +func (sc *schedulingController) GetHotPeerStats(rw utils.RWType) map[uint64][]*statistics.HotPeerStat { + // GetHotPeerStats is a thread-safe method + threshold := sc.opt.GetHotRegionCacheHitsThreshold() + if rw == utils.Read { + threshold = sc.opt.GetHotRegionCacheHitsThreshold() * + (utils.RegionHeartBeatReportInterval / utils.StoreHeartBeatReportInterval) + } + return sc.hotStat.GetHotPeerStats(rw, threshold) } // BucketsStats returns hot region's buckets stats. diff --git a/server/config/config.go b/server/config/config.go index d8bd086225c..c64ee3831b0 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -133,14 +133,14 @@ type Config struct { AutoCompactionRetention string `toml:"auto-compaction-retention" json:"auto-compaction-retention-v2"` // TickInterval is the interval for etcd Raft tick. - TickInterval typeutil.Duration `toml:"tick-interval"` + TickInterval typeutil.Duration `toml:"tick-interval" json:"tick-interval"` // ElectionInterval is the interval for etcd Raft election. - ElectionInterval typeutil.Duration `toml:"election-interval"` + ElectionInterval typeutil.Duration `toml:"election-interval" json:"election-interval"` // Prevote is true to enable Raft Pre-Vote. // If enabled, Raft runs an additional election phase // to check whether it would get enough votes to win // an election, thus minimizing disruptions. - PreVote bool `toml:"enable-prevote"` + PreVote bool `toml:"enable-prevote" json:"enable-prevote"` MaxRequestBytes uint `toml:"max-request-bytes" json:"max-request-bytes"` @@ -149,12 +149,12 @@ type Config struct { LabelProperty LabelPropertyConfig `toml:"label-property" json:"label-property"` // For all warnings during parsing. - WarningMsgs []string + WarningMsgs []string `json:"-"` - DisableStrictReconfigCheck bool + DisableStrictReconfigCheck bool `json:"-"` - HeartbeatStreamBindInterval typeutil.Duration - LeaderPriorityCheckInterval typeutil.Duration + HeartbeatStreamBindInterval typeutil.Duration `json:"-"` + LeaderPriorityCheckInterval typeutil.Duration `json:"-"` Logger *zap.Logger `json:"-"` LogProps *log.ZapProperties `json:"-"` diff --git a/server/server.go b/server/server.go index c79f51d8153..26f8ebb614c 100644 --- a/server/server.go +++ b/server/server.go @@ -17,7 +17,6 @@ package server import ( "bytes" "context" - errorspkg "errors" "fmt" "math/rand" "net/http" @@ -475,7 +474,7 @@ func (s *Server) startServer(ctx context.Context) error { s.tsoDispatcher = tsoutil.NewTSODispatcher(tsoProxyHandleDuration, tsoProxyBatchSize) s.tsoProtoFactory = &tsoutil.TSOProtoFactory{} s.pdProtoFactory = &tsoutil.PDProtoFactory{} - s.tsoAllocatorManager = tso.NewAllocatorManager(s.ctx, constant.DefaultKeyspaceGroupID, s.member, s.rootPath, s.storage, s, false) + s.tsoAllocatorManager = tso.NewAllocatorManager(s.ctx, constant.DefaultKeyspaceGroupID, s.member, s.rootPath, s.storage, s) // When disabled the Local TSO, we should clean up the Local TSO Allocator's meta info written in etcd if it exists. if !s.cfg.EnableLocalTSO { if err = s.tsoAllocatorManager.CleanUpDCLocation(); err != nil { @@ -490,7 +489,7 @@ func (s *Server) startServer(ctx context.Context) error { s.gcSafePointManager = gc.NewSafePointManager(s.storage, s.cfg.PDServerCfg) s.basicCluster = core.NewBasicCluster() - s.cluster = cluster.NewRaftCluster(ctx, clusterID, s.GetBasicCluster(), s.GetStorage(), syncer.NewRegionSyncer(s), s.client, s.httpClient) + s.cluster = cluster.NewRaftCluster(ctx, clusterID, s.GetMember(), s.GetBasicCluster(), s.GetStorage(), syncer.NewRegionSyncer(s), s.client, s.httpClient, s.tsoAllocatorManager) keyspaceIDAllocator := id.NewAllocator(&id.AllocatorParams{ Client: s.client, RootPath: s.rootPath, @@ -1715,29 +1714,6 @@ func (s *Server) campaignLeader() { s.member.KeepLeader(ctx) log.Info(fmt.Sprintf("campaign %s leader ok", s.mode), zap.String("campaign-leader-name", s.Name())) - if !s.IsAPIServiceMode() { - allocator, err := s.tsoAllocatorManager.GetAllocator(tso.GlobalDCLocation) - if err != nil { - log.Error("failed to get the global TSO allocator", errs.ZapError(err)) - return - } - log.Info("initializing the global TSO allocator") - if err := allocator.Initialize(0); err != nil { - log.Error("failed to initialize the global TSO allocator", errs.ZapError(err)) - return - } - defer func() { - s.tsoAllocatorManager.ResetAllocatorGroup(tso.GlobalDCLocation, false) - failpoint.Inject("updateAfterResetTSO", func() { - if err = allocator.UpdateTSO(); !errorspkg.Is(err, errs.ErrUpdateTimestamp) { - log.Panic("the tso update after reset should return ErrUpdateTimestamp as expected", zap.Error(err)) - } - if allocator.IsInitialize() { - log.Panic("the allocator should be uninitialized after reset") - } - }) - }() - } if err := s.reloadConfigFromKV(); err != nil { log.Error("failed to reload configuration", errs.ZapError(err)) return diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index 9f0b5f8d523..9574918a74a 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -361,7 +361,8 @@ func TestTSOFollowerProxy(t *testing.T) { defer cli1.Close() cli2 := setupCli(ctx, re, endpoints) defer cli2.Close() - cli2.UpdateOption(pd.EnableTSOFollowerProxy, true) + err = cli2.UpdateOption(pd.EnableTSOFollowerProxy, true) + re.NoError(err) var wg sync.WaitGroup wg.Add(tsoRequestConcurrencyNumber) @@ -385,6 +386,39 @@ func TestTSOFollowerProxy(t *testing.T) { }() } wg.Wait() + + // Disable the follower proxy and check if the stream is updated. + err = cli2.UpdateOption(pd.EnableTSOFollowerProxy, false) + re.NoError(err) + + wg.Add(tsoRequestConcurrencyNumber) + for i := 0; i < tsoRequestConcurrencyNumber; i++ { + go func() { + defer wg.Done() + var lastTS uint64 + for i := 0; i < tsoRequestRound; i++ { + physical, logical, err := cli2.GetTS(context.Background()) + if err != nil { + // It can only be the context canceled error caused by the stale stream cleanup. + re.ErrorContains(err, "context canceled") + continue + } + re.NoError(err) + ts := tsoutil.ComposeTS(physical, logical) + re.Less(lastTS, ts) + lastTS = ts + // After requesting with the follower proxy, request with the leader directly. + physical, logical, err = cli1.GetTS(context.Background()) + re.NoError(err) + ts = tsoutil.ComposeTS(physical, logical) + re.Less(lastTS, ts) + lastTS = ts + } + // Ensure at least one request is successful. + re.NotEmpty(lastTS) + }() + } + wg.Wait() } func TestTSOFollowerProxyWithTSOService(t *testing.T) { diff --git a/tests/integrations/mcs/scheduling/meta_test.go b/tests/integrations/mcs/scheduling/meta_test.go index d134336966a..e55f0281d72 100644 --- a/tests/integrations/mcs/scheduling/meta_test.go +++ b/tests/integrations/mcs/scheduling/meta_test.go @@ -86,7 +86,8 @@ func (suite *metaTestSuite) TestStoreWatch() { ) } - suite.getRaftCluster().RemoveStore(2, false) + re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/doNotBuryStore", `return(true)`)) + re.NoError(suite.getRaftCluster().RemoveStore(2, false)) testutil.Eventually(re, func() bool { s := cluster.GetStore(2) if s == nil { @@ -95,6 +96,7 @@ func (suite *metaTestSuite) TestStoreWatch() { return s.GetState() == metapb.StoreState_Offline }) re.Len(cluster.GetStores(), 4) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/doNotBuryStore")) testutil.Eventually(re, func() bool { return cluster.GetStore(2).GetState() == metapb.StoreState_Tombstone }) diff --git a/tests/integrations/realcluster/Makefile b/tests/integrations/realcluster/Makefile index 28c918ec2bf..8550f27c58a 100644 --- a/tests/integrations/realcluster/Makefile +++ b/tests/integrations/realcluster/Makefile @@ -28,9 +28,17 @@ tidy: git diff go.mod go.sum | cat git diff --quiet go.mod go.sum -check: deploy test kill_cluster +check: tiup test -deploy: kill_cluster +tiup: + # if tiup binary not exist, download it + if ! which tiup > /dev/null 2>&1; then \ + curl --proto '=https' --tlsv1.2 -sSf https://tiup-mirrors.pingcap.com/install.sh | sh; \ + fi + +deploy: kill_cluster deploy_only + +deploy_only: @ echo "deploying..." ./deploy.sh @ echo "wait cluster ready..." diff --git a/tests/integrations/realcluster/cluster_id_test.go b/tests/integrations/realcluster/cluster_id_test.go new file mode 100644 index 00000000000..27bede7fa1d --- /dev/null +++ b/tests/integrations/realcluster/cluster_id_test.go @@ -0,0 +1,85 @@ +// Copyright 2024 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package realcluster + +import ( + "context" + "os/exec" + "strings" + "testing" + + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + pd "github.com/tikv/pd/client" +) + +type clusterIDSuite struct { + realClusterSuite +} + +func TestClusterID(t *testing.T) { + suite.Run(t, &clusterIDSuite{ + realClusterSuite: realClusterSuite{ + suiteName: "cluster_id", + }, + }) +} + +func (s *clusterIDSuite) TestClientClusterID() { + re := require.New(s.T()) + ctx := context.Background() + // deploy second cluster + s.startRealCluster(s.T()) + defer s.stopRealCluster(s.T()) + + pdEndpoints := getPDEndpoints(s.T()) + // Try to create a client with the mixed endpoints. + _, err := pd.NewClientWithContext( + ctx, pdEndpoints, + pd.SecurityOption{}, pd.WithMaxErrorRetry(1), + ) + re.Error(err) + re.Contains(err.Error(), "unmatched cluster id") +} + +func getPDEndpoints(t *testing.T) []string { + cmd := exec.Command("sh", "-c", "ps -ef | grep tikv-server | awk -F '--pd-endpoints=' '{print $2}' | awk '{print $1}'") + bytes, err := cmd.Output() + require.NoError(t, err) + pdAddrsForEachTikv := strings.Split(string(bytes), "\n") + var pdAddrs []string + for _, addr := range pdAddrsForEachTikv { + // length of addr is less than 5 means it must not be a valid address + if len(addr) < 5 { + continue + } + pdAddrs = append(pdAddrs, strings.Split(addr, ",")...) + } + return removeDuplicates(pdAddrs) +} + +func removeDuplicates(arr []string) []string { + uniqueMap := make(map[string]bool) + var result []string + + for _, item := range arr { + if _, exists := uniqueMap[item]; !exists { + uniqueMap[item] = true + result = append(result, item) + } + } + + return result +} diff --git a/tests/integrations/realcluster/deploy.sh b/tests/integrations/realcluster/deploy.sh index f6f567314f0..a7991d559b4 100755 --- a/tests/integrations/realcluster/deploy.sh +++ b/tests/integrations/realcluster/deploy.sh @@ -28,9 +28,9 @@ else # CI will download the binaries in the prepare phase. # ref https://github.com/PingCAP-QE/ci/blob/387e9e533b365174962ccb1959442a7070f9cd66/pipelines/tikv/pd/latest/pull_integration_realcluster_test.groovy#L55-L68 color-green "using existing binaries..." - $TIUP_BIN_DIR playground nightly --kv 3 --tiflash 1 --db 1 --pd 3 --without-monitor \ + $TIUP_BIN_DIR playground nightly --kv 3 --tiflash 1 --db 1 --pd 3 --without-monitor --tag pd_real_cluster_test \ --pd.binpath ./bin/pd-server --kv.binpath ./bin/tikv-server --db.binpath ./bin/tidb-server \ - --tiflash.binpath ./bin/tiflash --tag pd_real_cluster_test --pd.config ./tests/integrations/realcluster/pd.toml \ + --tiflash.binpath ./bin/tiflash --pd.config ./tests/integrations/realcluster/pd.toml \ > $CUR_PATH/playground.log 2>&1 & fi diff --git a/tests/integrations/realcluster/download_integration_test_binaries.sh b/tests/integrations/realcluster/download_integration_test_binaries.sh new file mode 100644 index 00000000000..8d4cc3411a8 --- /dev/null +++ b/tests/integrations/realcluster/download_integration_test_binaries.sh @@ -0,0 +1,80 @@ +#! /usr/bin/env bash + +# help +# download some third party tools for integration test +# example: ./download_integration_test_binaries.sh master + + +set -o errexit +set -o pipefail + + +# Specify which branch to be utilized for executing the test, which is +# exclusively accessible when obtaining binaries from +# http://fileserver.pingcap.net. +branch=${1:-master} +file_server_url=${2:-http://fileserver.pingcap.net} + +tidb_sha1_url="${file_server_url}/download/refs/pingcap/tidb/${branch}/sha1" +tikv_sha1_url="${file_server_url}/download/refs/pingcap/tikv/${branch}/sha1" +tiflash_sha1_url="${file_server_url}/download/refs/pingcap/tiflash/${branch}/sha1" + +tidb_sha1=$(curl "$tidb_sha1_url") +tikv_sha1=$(curl "$tikv_sha1_url") +tiflash_sha1=$(curl "$tiflash_sha1_url") + +# download tidb / tikv / tiflash binary build from tibuid multibranch pipeline +tidb_download_url="${file_server_url}/download/builds/pingcap/tidb/${tidb_sha1}/centos7/tidb-server.tar.gz" +tikv_download_url="${file_server_url}/download/builds/pingcap/tikv/${tikv_sha1}/centos7/tikv-server.tar.gz" +tiflash_download_url="${file_server_url}/download/builds/pingcap/tiflash/${branch}/${tiflash_sha1}/centos7/tiflash.tar.gz" + +set -o nounset + +# See https://misc.flogisoft.com/bash/tip_colors_and_formatting. +color_green() { # Green + echo -e "\x1B[1;32m${*}\x1B[0m" +} + +function download() { + local url=$1 + local file_name=$2 + local file_path=$3 + if [[ -f "${file_path}" ]]; then + echo "file ${file_name} already exists, skip download" + return + fi + echo "download ${file_name} from ${url}" + wget --no-verbose --retry-connrefused --waitretry=1 -t 3 -O "${file_path}" "${url}" +} + +function main() { + rm -rf third_bin + rm -rf tmp + mkdir third_bin + mkdir tmp + + # tidb server + download "$tidb_download_url" "tidb-server.tar.gz" "tmp/tidb-server.tar.gz" + tar -xzf tmp/tidb-server.tar.gz -C third_bin --wildcards 'bin/*' + mv third_bin/bin/* third_bin/ + + # TiKV server + download "$tikv_download_url" "tikv-server.tar.gz" "tmp/tikv-server.tar.gz" + tar -xzf tmp/tikv-server.tar.gz -C third_bin --wildcards 'bin/*' + mv third_bin/bin/* third_bin/ + + # TiFlash + download "$tiflash_download_url" "tiflash.tar.gz" "tmp/tiflash.tar.gz" + tar -xzf tmp/tiflash.tar.gz -C third_bin + mv third_bin/tiflash third_bin/_tiflash + mv third_bin/_tiflash/* third_bin && rm -rf third_bin/_tiflash + + chmod +x third_bin/* + rm -rf tmp + rm -rf third_bin/bin + ls -alh third_bin/ +} + +main "$@" + +color_green "Download SUCCESS" diff --git a/tests/integrations/realcluster/real_cluster.go b/tests/integrations/realcluster/real_cluster.go new file mode 100644 index 00000000000..1843b78a528 --- /dev/null +++ b/tests/integrations/realcluster/real_cluster.go @@ -0,0 +1,167 @@ +// Copyright 2024 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package realcluster + +import ( + "fmt" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/pingcap/log" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "go.uber.org/zap" +) + +type realClusterSuite struct { + suite.Suite + + clusterCnt int + suiteName string +} + +var ( + playgroundLogDir = filepath.Join("tmp", "real_cluster", "playground") + tiupBin = os.Getenv("HOME") + "/.tiup/bin/tiup" +) + +// SetupSuite will run before the tests in the suite are run. +func (s *realClusterSuite) SetupSuite() { + t := s.T() + + // Clean the data dir. It is the default data dir of TiUP. + dataDir := filepath.Join(os.Getenv("HOME"), ".tiup", "data", "pd_real_cluster_test_"+s.suiteName+"_*") + matches, err := filepath.Glob(dataDir) + require.NoError(t, err) + + for _, match := range matches { + require.NoError(t, runCommand("rm", "-rf", match)) + } + s.startRealCluster(t) + t.Cleanup(func() { + s.stopRealCluster(t) + }) +} + +// TearDownSuite will run after all the tests in the suite have been run. +func (s *realClusterSuite) TearDownSuite() { + // Even if the cluster deployment fails, we still need to destroy the cluster. + // If the cluster does not fail to deploy, the cluster will be destroyed in + // the cleanup function. And these code will not work. + s.clusterCnt++ + s.stopRealCluster(s.T()) +} + +func (s *realClusterSuite) startRealCluster(t *testing.T) { + log.Info("start to deploy a real cluster") + + tag := s.tag() + deployTiupPlayground(t, tag) + waitTiupReady(t, tag) + s.clusterCnt++ +} + +func (s *realClusterSuite) stopRealCluster(t *testing.T) { + s.clusterCnt-- + + log.Info("start to destroy a real cluster", zap.String("tag", s.tag())) + destroy(t, s.tag()) + time.Sleep(5 * time.Second) +} + +func (s *realClusterSuite) tag() string { + return fmt.Sprintf("pd_real_cluster_test_%s_%d", s.suiteName, s.clusterCnt) +} + +func (s *realClusterSuite) restart() { + tag := s.tag() + log.Info("start to restart", zap.String("tag", tag)) + s.stopRealCluster(s.T()) + s.startRealCluster(s.T()) + log.Info("TiUP restart success") +} + +func destroy(t *testing.T, tag string) { + cmdStr := fmt.Sprintf("ps -ef | grep %s | awk '{print $2}'", tag) + cmd := exec.Command("sh", "-c", cmdStr) + bytes, err := cmd.Output() + require.NoError(t, err) + pids := string(bytes) + pidArr := strings.Split(pids, "\n") + for _, pid := range pidArr { + // nolint:errcheck + runCommand("sh", "-c", "kill -9 "+pid) + } + log.Info("destroy success", zap.String("tag", tag)) +} + +func deployTiupPlayground(t *testing.T, tag string) { + curPath, err := os.Getwd() + require.NoError(t, err) + require.NoError(t, os.Chdir("../../..")) + + if !fileExists("third_bin") || !fileExists("third_bin/tikv-server") || !fileExists("third_bin/tidb-server") || !fileExists("third_bin/tiflash") { + log.Info("downloading binaries...") + log.Info("this may take a few minutes, you can also download them manually and put them in the bin directory.") + require.NoError(t, runCommand("sh", + "./tests/integrations/realcluster/download_integration_test_binaries.sh")) + } + if !fileExists("bin") || !fileExists("bin/pd-server") { + log.Info("complie pd binaries...") + require.NoError(t, runCommand("make", "pd-server")) + } + + if !fileExists(playgroundLogDir) { + require.NoError(t, os.MkdirAll(playgroundLogDir, 0755)) + } + // nolint:errcheck + go func() { + runCommand("sh", "-c", + tiupBin+` playground nightly --kv 3 --tiflash 1 --db 1 --pd 3 \ + --without-monitor --tag `+tag+` --pd.binpath ./bin/pd-server \ + --kv.binpath ./third_bin/tikv-server \ + --db.binpath ./third_bin/tidb-server --tiflash.binpath ./third_bin/tiflash \ + --pd.config ./tests/integrations/realcluster/pd.toml \ + > `+filepath.Join(playgroundLogDir, tag+".log")+` 2>&1 & `) + }() + + // Avoid to change the dir before execute `tiup playground`. + time.Sleep(10 * time.Second) + require.NoError(t, os.Chdir(curPath)) +} + +func waitTiupReady(t *testing.T, tag string) { + const ( + interval = 5 + maxTimes = 20 + ) + log.Info("start to wait TiUP ready", zap.String("tag", tag)) + for i := 0; i < maxTimes; i++ { + err := runCommand(tiupBin, "playground", "display", "--tag", tag) + if err == nil { + log.Info("TiUP is ready", zap.String("tag", tag)) + return + } + + log.Info("TiUP is not ready, will retry", zap.Int("retry times", i), + zap.String("tag", tag), zap.Error(err)) + time.Sleep(time.Duration(interval) * time.Second) + } + require.Failf(t, "TiUP is not ready", "tag: %s", tag) +} diff --git a/tests/integrations/realcluster/reboot_pd_test.go b/tests/integrations/realcluster/reboot_pd_test.go index 9f2b286e9b1..e3f37ac0605 100644 --- a/tests/integrations/realcluster/reboot_pd_test.go +++ b/tests/integrations/realcluster/reboot_pd_test.go @@ -16,31 +16,31 @@ package realcluster import ( "context" - "os" - "os/exec" "testing" - "github.com/pingcap/log" "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "github.com/tikv/pd/client/http" ) -func restartTiUP() { - log.Info("start to restart TiUP") - cmd := exec.Command("make", "deploy") - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - err := cmd.Run() - if err != nil { - panic(err) - } - log.Info("TiUP restart success") +type rebootPDSuite struct { + realClusterSuite +} + +func TestRebootPD(t *testing.T) { + suite.Run(t, &rebootPDSuite{ + realClusterSuite: realClusterSuite{ + suiteName: "reboot_pd", + }, + }) } // https://github.com/tikv/pd/issues/6467 -func TestReloadLabel(t *testing.T) { - re := require.New(t) +func (s *rebootPDSuite) TestReloadLabel() { + re := require.New(s.T()) ctx := context.Background() + pdHTTPCli := http.NewClient("pd-real-cluster-test", getPDEndpoints(s.T())) resp, err := pdHTTPCli.GetStores(ctx) re.NoError(err) re.NotEmpty(resp.Stores) @@ -74,7 +74,8 @@ func TestReloadLabel(t *testing.T) { } // Check the label is set checkLabelsAreEqual() - // Restart TiUP to reload the label - restartTiUP() + // Restart to reload the label + s.restart() + pdHTTPCli = http.NewClient("pd-real-cluster-test", getPDEndpoints(s.T())) checkLabelsAreEqual() } diff --git a/tests/integrations/realcluster/scheduler_test.go b/tests/integrations/realcluster/scheduler_test.go index 98a18158114..69da846b491 100644 --- a/tests/integrations/realcluster/scheduler_test.go +++ b/tests/integrations/realcluster/scheduler_test.go @@ -22,19 +22,33 @@ import ( "time" "github.com/stretchr/testify/require" - pd "github.com/tikv/pd/client/http" + "github.com/stretchr/testify/suite" + "github.com/tikv/pd/client/http" "github.com/tikv/pd/client/testutil" "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/schedule/types" ) +type schedulerSuite struct { + realClusterSuite +} + +func TestScheduler(t *testing.T) { + suite.Run(t, &schedulerSuite{ + realClusterSuite: realClusterSuite{ + suiteName: "scheduler", + }, + }) +} + // https://github.com/tikv/pd/issues/6988#issuecomment-1694924611 // https://github.com/tikv/pd/issues/6897 -func TestTransferLeader(t *testing.T) { - re := require.New(t) +func (s *schedulerSuite) TestTransferLeader() { + re := require.New(s.T()) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + pdHTTPCli := http.NewClient("pd-real-cluster-test", getPDEndpoints(s.T())) resp, err := pdHTTPCli.GetLeader(ctx) re.NoError(err) oldLeader := resp.Name @@ -79,11 +93,12 @@ func TestTransferLeader(t *testing.T) { re.Len(res, oldSchedulersLen) } -func TestRegionLabelDenyScheduler(t *testing.T) { - re := require.New(t) +func (s *schedulerSuite) TestRegionLabelDenyScheduler() { + re := require.New(s.T()) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + pdHTTPCli := http.NewClient("pd-real-cluster-test", getPDEndpoints(s.T())) regions, err := pdHTTPCli.GetRegions(ctx) re.NoError(err) re.NotEmpty(regions.Regions) @@ -114,15 +129,15 @@ func TestRegionLabelDenyScheduler(t *testing.T) { }, testutil.WithWaitFor(time.Minute)) // disable schedule for region1 - labelRule := &pd.LabelRule{ + labelRule := &http.LabelRule{ ID: "rule1", - Labels: []pd.RegionLabel{{Key: "schedule", Value: "deny"}}, + Labels: []http.RegionLabel{{Key: "schedule", Value: "deny"}}, RuleType: "key-range", Data: labeler.MakeKeyRanges(region1.StartKey, region1.EndKey), } re.NoError(pdHTTPCli.SetRegionLabelRule(ctx, labelRule)) defer func() { - pdHTTPCli.PatchRegionLabelRules(ctx, &pd.LabelRulePatch{DeleteRules: []string{labelRule.ID}}) + pdHTTPCli.PatchRegionLabelRules(ctx, &http.LabelRulePatch{DeleteRules: []string{labelRule.ID}}) }() labelRules, err := pdHTTPCli.GetAllRegionLabelRules(ctx) re.NoError(err) @@ -170,7 +185,7 @@ func TestRegionLabelDenyScheduler(t *testing.T) { return true }, testutil.WithWaitFor(time.Minute)) - pdHTTPCli.PatchRegionLabelRules(ctx, &pd.LabelRulePatch{DeleteRules: []string{labelRule.ID}}) + pdHTTPCli.PatchRegionLabelRules(ctx, &http.LabelRulePatch{DeleteRules: []string{labelRule.ID}}) labelRules, err = pdHTTPCli.GetAllRegionLabelRules(ctx) re.NoError(err) re.Len(labelRules, 1) diff --git a/tests/integrations/realcluster/ts_test.go b/tests/integrations/realcluster/ts_test.go index 5d970556fbc..f19124d04a4 100644 --- a/tests/integrations/realcluster/ts_test.go +++ b/tests/integrations/realcluster/ts_test.go @@ -18,12 +18,25 @@ import ( "testing" "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" ) +type tsSuite struct { + realClusterSuite +} + func TestTS(t *testing.T) { - re := require.New(t) + suite.Run(t, &tsSuite{ + realClusterSuite: realClusterSuite{ + suiteName: "ts", + }, + }) +} + +func (s *tsSuite) TestTS() { + re := require.New(s.T()) - db := OpenTestDB(t) + db := OpenTestDB(s.T()) db.MustExec("use test") db.MustExec("drop table if exists t") db.MustExec("create table t(a int, index i(a))") diff --git a/tests/integrations/realcluster/util.go b/tests/integrations/realcluster/util.go index f6c8295b6ef..789ceaa29c2 100644 --- a/tests/integrations/realcluster/util.go +++ b/tests/integrations/realcluster/util.go @@ -15,18 +15,13 @@ package realcluster import ( + "os" + "os/exec" "time" - - "github.com/tikv/pd/client/http" ) const physicalShiftBits = 18 -var ( - pdAddrs = []string{"http://127.0.0.1:2379"} - pdHTTPCli = http.NewClient("pd-real-cluster-test", pdAddrs) -) - // GetTimeFromTS extracts time.Time from a timestamp. func GetTimeFromTS(ts uint64) time.Time { ms := ExtractPhysical(ts) @@ -37,3 +32,15 @@ func GetTimeFromTS(ts uint64) time.Time { func ExtractPhysical(ts uint64) int64 { return int64(ts >> physicalShiftBits) } + +func runCommand(name string, args ...string) error { + cmd := exec.Command(name, args...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + return cmd.Run() +} + +func fileExists(path string) bool { + _, err := os.Stat(path) + return !os.IsNotExist(err) +} diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index a669e093200..d1a649cbfa6 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -339,12 +339,12 @@ func (suite *tsoClientTestSuite) TestUpdateAfterResetTSO() { return err == nil }) // Resign leader to trigger the TSO resetting. - re.NoError(failpoint.Enable("github.com/tikv/pd/server/updateAfterResetTSO", "return(true)")) + re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/updateAfterResetTSO", "return(true)")) oldLeaderName := suite.cluster.WaitLeader() re.NotEmpty(oldLeaderName) err := suite.cluster.GetServer(oldLeaderName).ResignLeader() re.NoError(err) - re.NoError(failpoint.Disable("github.com/tikv/pd/server/updateAfterResetTSO")) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/updateAfterResetTSO")) newLeaderName := suite.cluster.WaitLeader() re.NotEmpty(newLeaderName) re.NotEqual(oldLeaderName, newLeaderName) diff --git a/tests/integrations/tso/consistency_test.go b/tests/integrations/tso/consistency_test.go index 9ebe6dec8af..f82f58ee6c8 100644 --- a/tests/integrations/tso/consistency_test.go +++ b/tests/integrations/tso/consistency_test.go @@ -81,6 +81,7 @@ func (suite *tsoConsistencyTestSuite) SetupSuite() { leaderName := suite.cluster.WaitLeader() re.NotEmpty(leaderName) suite.pdLeaderServer = suite.cluster.GetServer(leaderName) + suite.pdLeaderServer.BootstrapCluster() backendEndpoints := suite.pdLeaderServer.GetAddr() if suite.legacy { suite.pdClient = tu.MustNewGrpcClient(re, backendEndpoints) diff --git a/tests/integrations/tso/server_test.go b/tests/integrations/tso/server_test.go index 5e14611ab65..651a1df96b4 100644 --- a/tests/integrations/tso/server_test.go +++ b/tests/integrations/tso/server_test.go @@ -79,6 +79,7 @@ func (suite *tsoServerTestSuite) SetupSuite() { leaderName := suite.cluster.WaitLeader() re.NotEmpty(leaderName) suite.pdLeaderServer = suite.cluster.GetServer(leaderName) + suite.pdLeaderServer.BootstrapCluster() backendEndpoints := suite.pdLeaderServer.GetAddr() if suite.legacy { suite.pdClient = tu.MustNewGrpcClient(re, backendEndpoints) diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index a9be92d19e9..e1a56982f2d 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -912,7 +912,7 @@ func TestLoadClusterInfo(t *testing.T) { tc.WaitLeader() leaderServer := tc.GetLeaderServer() svr := leaderServer.GetServer() - rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient()) + rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetMember(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetTSOAllocatorManager()) // Cluster is not bootstrapped. rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetHBStreams(), svr.GetKeyspaceGroupManager()) @@ -952,7 +952,7 @@ func TestLoadClusterInfo(t *testing.T) { } re.NoError(testStorage.Flush()) - raftCluster = cluster.NewRaftCluster(ctx, svr.ClusterID(), basicCluster, testStorage, syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient()) + raftCluster = cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetMember(), basicCluster, testStorage, syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetTSOAllocatorManager()) raftCluster.InitCluster(mockid.NewIDAllocator(), svr.GetPersistOptions(), svr.GetHBStreams(), svr.GetKeyspaceGroupManager()) raftCluster, err = raftCluster.LoadClusterInfo() re.NoError(err) @@ -1666,7 +1666,7 @@ func TestTransferLeaderBack(t *testing.T) { tc.WaitLeader() leaderServer := tc.GetLeaderServer() svr := leaderServer.GetServer() - rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient()) + rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetMember(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetTSOAllocatorManager()) rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetHBStreams(), svr.GetKeyspaceGroupManager()) storage := rc.GetStorage() meta := &metapb.Cluster{Id: 123} diff --git a/tests/server/tso/allocator_test.go b/tests/server/tso/allocator_test.go index 692aec490eb..257cd3b6a34 100644 --- a/tests/server/tso/allocator_test.go +++ b/tests/server/tso/allocator_test.go @@ -127,6 +127,9 @@ func TestPriorityAndDifferentLocalTSO(t *testing.T) { re.NoError(cluster.RunInitialServers()) cluster.WaitAllLeaders(re, dcLocationConfig) + leaderServer := cluster.GetLeaderServer() + re.NotNil(leaderServer) + leaderServer.BootstrapCluster() // Wait for all nodes becoming healthy. time.Sleep(time.Second * 5) diff --git a/tests/server/tso/global_tso_test.go b/tests/server/tso/global_tso_test.go index d8f64afe871..c340c44d3d2 100644 --- a/tests/server/tso/global_tso_test.go +++ b/tests/server/tso/global_tso_test.go @@ -99,6 +99,7 @@ func TestDelaySyncTimestamp(t *testing.T) { var leaderServer, nextLeaderServer *tests.TestServer leaderServer = cluster.GetLeaderServer() re.NotNil(leaderServer) + leaderServer.BootstrapCluster() for _, s := range cluster.GetServers() { if s.GetConfig().Name != cluster.GetLeader() { nextLeaderServer = s @@ -146,6 +147,8 @@ func TestLogicalOverflow(t *testing.T) { re.NotEmpty(cluster.WaitLeader()) leaderServer := cluster.GetLeaderServer() + re.NotNil(leaderServer) + leaderServer.BootstrapCluster() grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) clusterID := leaderServer.GetClusterID() diff --git a/tests/server/tso/tso_test.go b/tests/server/tso/tso_test.go index 5be37e293cf..fc2f5999840 100644 --- a/tests/server/tso/tso_test.go +++ b/tests/server/tso/tso_test.go @@ -114,6 +114,8 @@ func TestDisableLocalTSOAfterEnabling(t *testing.T) { re.NoError(cluster.RunInitialServers()) cluster.WaitAllLeaders(re, dcLocationConfig) + leaderServer := cluster.GetLeaderServer() + leaderServer.BootstrapCluster() requestLocalTSOs(re, cluster, dcLocationConfig) // Reboot the cluster. @@ -125,7 +127,7 @@ func TestDisableLocalTSOAfterEnabling(t *testing.T) { re.NotEmpty(cluster.WaitLeader()) // Re-request the global TSOs. - leaderServer := cluster.GetLeaderServer() + leaderServer = cluster.GetLeaderServer() grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) clusterID := leaderServer.GetClusterID() req := &pdpb.TsoRequest{ diff --git a/tools/pd-backup/tests/backup_test.go b/tools/pd-backup/tests/backup_test.go index 2a55a790849..05d2b7b92ed 100644 --- a/tools/pd-backup/tests/backup_test.go +++ b/tools/pd-backup/tests/backup_test.go @@ -37,6 +37,8 @@ func TestBackup(t *testing.T) { err = cluster.RunInitialServers() re.NoError(err) re.NotEmpty(cluster.WaitLeader()) + leaderServer := cluster.GetLeaderServer() + leaderServer.BootstrapCluster() pdAddr := cluster.GetConfig().GetClientURL() urls := strings.Split(pdAddr, ",") defer cluster.Destroy()