diff --git a/go.mod b/go.mod index aa8105ceb..dcb060e11 100644 --- a/go.mod +++ b/go.mod @@ -59,3 +59,5 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +replace github.com/pingcap/kvproto => github.com/MyonKeminta/kvproto v0.0.0-20240408102041-05784a0bc698 diff --git a/go.sum b/go.sum index cf2c4a7d9..9f1395284 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/MyonKeminta/kvproto v0.0.0-20240408102041-05784a0bc698 h1:3eRDRr/Z6I4x62V6i1/pO/ZYVeqm5Sk2Rctr/2AUSNM= +github.com/MyonKeminta/kvproto v0.0.0-20240408102041-05784a0bc698/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= @@ -74,8 +76,6 @@ github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgW github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= -github.com/pingcap/kvproto v0.0.0-20240227073058-929ab83f9754 h1:nU9wDeMsID8EWawRQVdmRYcNhUrlI4TKogZhXleG4QQ= -github.com/pingcap/kvproto v0.0.0-20240227073058-929ab83f9754/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index e109db5a7..586207865 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -631,6 +631,8 @@ type RegionCache struct { codec apicodec.Codec enableForwarding bool + requestHealthFeedbackCallback func(ctx context.Context, addr string) error + mu regionIndexMu storeMu struct { @@ -656,7 +658,8 @@ type RegionCache struct { } type regionCacheOptions struct { - noHealthTick bool + noHealthTick bool + requestHealthFeedback func(ctx context.Context, addr string) error } type RegionCacheOpt func(*regionCacheOptions) @@ -665,6 +668,12 @@ func RegionCacheNoHealthTick(o *regionCacheOptions) { o.noHealthTick = true } +func WithRequestHealthFeedback(callback func(ctx context.Context, addr string) error) RegionCacheOpt { + return func(options *regionCacheOptions) { + options.requestHealthFeedback = callback + } +} + // NewRegionCache creates a RegionCache. func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache { var options regionCacheOptions @@ -673,7 +682,8 @@ func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache { } c := &RegionCache{ - pdClient: pdClient, + pdClient: pdClient, + requestHealthFeedbackCallback: options.requestHealthFeedback, } c.codec = apicodec.NewCodecV1(apicodec.ModeRaw) @@ -721,7 +731,7 @@ func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache { return false }, time.Duration(refreshStoreInterval/4)*time.Second, c.getCheckStoreEvents()) if !options.noHealthTick { - c.bg.schedule(repeat(c.checkAndUpdateStoreHealthStatus), time.Duration(refreshStoreInterval/4)*time.Second) + c.bg.schedule(c.checkAndUpdateStoreHealthStatus, time.Duration(refreshStoreInterval/4)*time.Second) } c.bg.schedule(repeat(c.reportStoreReplicaFlows), time.Duration(refreshStoreInterval/2)*time.Second) if refreshCacheInterval := config.GetGlobalConfig().RegionsRefreshInterval; refreshCacheInterval > 0 { @@ -2645,7 +2655,7 @@ func (r *Region) ContainsByEnd(key []byte) bool { } // checkAndUpdateStoreHealthStatus checks and updates health stats on each store. -func (c *RegionCache) checkAndUpdateStoreHealthStatus() { +func (c *RegionCache) checkAndUpdateStoreHealthStatus(ctx context.Context, now time.Time) bool { defer func() { r := recover() if r != nil { @@ -2657,16 +2667,18 @@ func (c *RegionCache) checkAndUpdateStoreHealthStatus() { } } }() - healthDetails := make(map[uint64]HealthStatusDetail) - now := time.Now() + var stores []*Store c.forEachStore(func(store *Store) { - store.healthStatus.tick(now) - healthDetails[store.storeID] = store.healthStatus.GetHealthStatusDetail() + stores = append(stores, store) }) - for store, details := range healthDetails { - metrics.TiKVStoreSlowScoreGauge.WithLabelValues(strconv.FormatUint(store, 10)).Set(float64(details.ClientSideSlowScore)) - metrics.TiKVFeedbackSlowScoreGauge.WithLabelValues(strconv.FormatUint(store, 10)).Set(float64(details.TiKVSideSlowScore)) + for _, store := range stores { + store.healthStatus.tick(ctx, now, store, c.requestHealthFeedbackCallback) + healthDetails := store.healthStatus.GetHealthStatusDetail() + metrics.TiKVStoreSlowScoreGauge.WithLabelValues(strconv.FormatUint(store.storeID, 10)).Set(float64(healthDetails.ClientSideSlowScore)) + metrics.TiKVFeedbackSlowScoreGauge.WithLabelValues(strconv.FormatUint(store.storeID, 10)).Set(float64(healthDetails.TiKVSideSlowScore)) } + + return false } // reportStoreReplicaFlows reports the statistics on the related replicaFlowsType. diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index 3369fc4fd..fd8d3758c 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -57,6 +57,7 @@ import ( "github.com/tikv/client-go/v2/internal/apicodec" "github.com/tikv/client-go/v2/internal/mockstore/mocktikv" "github.com/tikv/client-go/v2/kv" + "github.com/tikv/client-go/v2/tikvrpc" pd "github.com/tikv/pd/client" uatomic "go.uber.org/atomic" ) @@ -2088,10 +2089,14 @@ func (s *testRegionCacheSuite) TestHealthCheckWithStoreReplace() { } func (s *testRegionCacheSuite) TestTiKVSideSlowScore() { + store := newStore(1, "", "", "", tikvrpc.TiKV, resolved, nil) + store.livenessState = uint32(reachable) + ctx := context.Background() + stats := newStoreHealthStatus(1) s.LessOrEqual(stats.GetHealthStatusDetail().TiKVSideSlowScore, int64(1)) now := time.Now() - stats.tick(now) + stats.tick(ctx, now, store, nil) s.LessOrEqual(stats.GetHealthStatusDetail().TiKVSideSlowScore, int64(1)) s.False(stats.tikvSideSlowScore.hasTiKVFeedback.Load()) s.False(stats.IsSlow()) @@ -2108,22 +2113,26 @@ func (s *testRegionCacheSuite) TestTiKVSideSlowScore() { s.True(stats.IsSlow()) now = now.Add(time.Minute * 2) - stats.tick(now) + stats.tick(ctx, now, store, nil) s.Equal(int64(60), stats.GetHealthStatusDetail().TiKVSideSlowScore) s.False(stats.IsSlow()) now = now.Add(time.Minute * 3) - stats.tick(now) + stats.tick(ctx, now, store, nil) s.Equal(int64(1), stats.GetHealthStatusDetail().TiKVSideSlowScore) s.False(stats.IsSlow()) now = now.Add(time.Minute) - stats.tick(now) + stats.tick(ctx, now, store, nil) s.Equal(int64(1), stats.GetHealthStatusDetail().TiKVSideSlowScore) s.False(stats.IsSlow()) } func (s *testRegionCacheSuite) TestStoreHealthStatus() { + store := newStore(1, "", "", "", tikvrpc.TiKV, resolved, nil) + store.livenessState = uint32(reachable) + ctx := context.Background() + stats := newStoreHealthStatus(1) now := time.Now() s.False(stats.IsSlow()) @@ -2131,7 +2140,7 @@ func (s *testRegionCacheSuite) TestStoreHealthStatus() { for !stats.clientSideSlowScore.isSlow() { stats.clientSideSlowScore.recordSlowScoreStat(time.Minute) } - stats.tick(now) + stats.tick(ctx, now, store, nil) s.True(stats.IsSlow()) s.Equal(int64(stats.clientSideSlowScore.getSlowScore()), stats.GetHealthStatusDetail().ClientSideSlowScore) @@ -2142,7 +2151,7 @@ func (s *testRegionCacheSuite) TestStoreHealthStatus() { for stats.clientSideSlowScore.isSlow() { stats.clientSideSlowScore.recordSlowScoreStat(time.Millisecond) - stats.tick(now) + stats.tick(ctx, now, store, nil) } s.True(stats.IsSlow()) s.Equal(int64(stats.clientSideSlowScore.getSlowScore()), stats.GetHealthStatusDetail().ClientSideSlowScore) diff --git a/internal/locate/store_cache.go b/internal/locate/store_cache.go index 94ea5bfed..d0e8db1f0 100644 --- a/internal/locate/store_cache.go +++ b/internal/locate/store_cache.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "math" + "strconv" "strings" "sync" "sync/atomic" @@ -790,7 +791,7 @@ const ( tikvSlowScoreSlowThreshold int64 = 80 tikvSlowScoreUpdateInterval = time.Millisecond * 100 - tikvSlowScoreUpdateFromPDInterval = time.Minute + tikvSlowScoreActiveUpdateInterval = time.Second * 30 ) type StoreHealthStatus struct { @@ -848,9 +849,10 @@ func (s *StoreHealthStatus) GetHealthStatusDetail() HealthStatusDetail { // tick updates the health status that changes over time, such as slow score's decaying, etc. This function is expected // to be called periodically. -func (s *StoreHealthStatus) tick(now time.Time) { +func (s *StoreHealthStatus) tick(ctx context.Context, now time.Time, store *Store, requestHealthFeedbackCallback func(ctx context.Context, addr string) error) { + metrics.TiKVHealthFeedbackOpsCounter.WithLabelValues(strconv.FormatUint(store.StoreID(), 10), "tick").Inc() s.clientSideSlowScore.updateSlowScore() - s.updateTiKVServerSideSlowScoreOnTick(now) + s.updateTiKVServerSideSlowScoreOnTick(ctx, now, store, requestHealthFeedbackCallback) s.updateSlowFlag() } @@ -868,17 +870,49 @@ func (s *StoreHealthStatus) markAlreadySlow() { // updateTiKVServerSideSlowScoreOnTick updates the slow score actively, which is expected to be a periodic job. // It skips updating if the last update time didn't elapse long enough, or it's being updated concurrently. -func (s *StoreHealthStatus) updateTiKVServerSideSlowScoreOnTick(now time.Time) { +func (s *StoreHealthStatus) updateTiKVServerSideSlowScoreOnTick(ctx context.Context, now time.Time, store *Store, requestHealthFeedbackCallback func(ctx context.Context, addr string) error) { if !s.tikvSideSlowScore.hasTiKVFeedback.Load() { // Do nothing if no feedback has been received from this store yet. return } - lastUpdateTime := s.tikvSideSlowScore.lastUpdateTime.Load() - if lastUpdateTime == nil || now.Sub(*lastUpdateTime) < tikvSlowScoreUpdateFromPDInterval { - // If the first feedback is + + needRefreshing := func() bool { + lastUpdateTime := s.tikvSideSlowScore.lastUpdateTime.Load() + if lastUpdateTime == nil { + // If the first hasn't been received yet, assume the store doesn't support feeding back and skip the tick. + return false + } + + return now.Sub(*lastUpdateTime) >= tikvSlowScoreActiveUpdateInterval + } + + if !needRefreshing() { return } + // If not updated for too long, try to actively fetch it from TiKV. + // Note that this can't be done while holding the mutex, because the updating is done by the client when receiving + // the response (in the same way as handling the feedback information pushed from TiKV), which needs acquiring the + // mutex. + if requestHealthFeedbackCallback != nil && store.getLivenessState() == reachable { + addr := store.GetAddr() + if len(addr) == 0 { + logutil.Logger(ctx).Warn("skip actively request health feedback info from store due to unknown addr", zap.Uint64("storeID", store.StoreID())) + } else { + metrics.TiKVHealthFeedbackOpsCounter.WithLabelValues(strconv.FormatUint(store.StoreID(), 10), "active_update").Inc() + err := requestHealthFeedbackCallback(ctx, store.GetAddr()) + if err != nil { + metrics.TiKVHealthFeedbackOpsCounter.WithLabelValues(strconv.FormatUint(store.StoreID(), 10), "active_update_err").Inc() + logutil.Logger(ctx).Warn("actively request health feedback info from store got error", zap.Uint64("storeID", store.StoreID()), zap.Error(err)) + } + } + + // Continue if active updating is unsuccessful. + if !needRefreshing() { + return + } + } + if !s.tikvSideSlowScore.TryLock() { // It must be being updated concurrently. return @@ -886,16 +920,13 @@ func (s *StoreHealthStatus) updateTiKVServerSideSlowScoreOnTick(now time.Time) { defer s.tikvSideSlowScore.Unlock() // Reload update time as it might be updated concurrently before acquiring mutex - lastUpdateTime = s.tikvSideSlowScore.lastUpdateTime.Load() + lastUpdateTime := s.tikvSideSlowScore.lastUpdateTime.Load() elapsed := now.Sub(*lastUpdateTime) - if elapsed < tikvSlowScoreUpdateFromPDInterval { + if elapsed < tikvSlowScoreActiveUpdateInterval { return } - // TODO: Try to get store status from PD here. But it's not mandatory. - // Don't forget to update tests if getting slow score from PD is implemented here. - - // If updating from PD is not successful: decay the slow score. + // If requesting from TiKV is not successful: decay the slow score. score := s.tikvSideSlowScore.score.Load() if score < 1 { return diff --git a/metrics/metrics.go b/metrics/metrics.go index f5e8827b0..28ea16a9e 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -102,6 +102,7 @@ var ( TiKVAggressiveLockedKeysCounter *prometheus.CounterVec TiKVStoreSlowScoreGauge *prometheus.GaugeVec TiKVFeedbackSlowScoreGauge *prometheus.GaugeVec + TiKVHealthFeedbackOpsCounter *prometheus.CounterVec TiKVPreferLeaderFlowsGauge *prometheus.GaugeVec TiKVStaleReadCounter *prometheus.CounterVec TiKVStaleReadReqCounter *prometheus.CounterVec @@ -716,6 +717,15 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { ConstLabels: constLabels, }, []string{LblStore}) + TiKVHealthFeedbackOpsCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "health_feedback_ops_counter", + Help: "Counter of operations about TiKV health feedback", + ConstLabels: constLabels, + }, []string{LblScope, LblType}) + TiKVPreferLeaderFlowsGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: namespace, @@ -857,6 +867,7 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVAggressiveLockedKeysCounter) prometheus.MustRegister(TiKVStoreSlowScoreGauge) prometheus.MustRegister(TiKVFeedbackSlowScoreGauge) + prometheus.MustRegister(TiKVHealthFeedbackOpsCounter) prometheus.MustRegister(TiKVPreferLeaderFlowsGauge) prometheus.MustRegister(TiKVStaleReadCounter) prometheus.MustRegister(TiKVStaleReadReqCounter) diff --git a/tikv/kv.go b/tikv/kv.go index ac189a8c1..b903d080c 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -236,6 +236,23 @@ func loadOption(store *KVStore, opt ...Option) { } } +const requestHealthFeedbackTimeout = time.Second * 2 + +func requestHealthFeedbackFromKVClient(ctx context.Context, addr string, tikvClient Client) error { + resp, err := tikvClient.SendRequest(ctx, addr, tikvrpc.NewRequest(tikvrpc.CmdRequestHealthFeedback, &kvrpcpb.RequestHealthFeedbackRequest{}), requestHealthFeedbackTimeout) + if err != nil { + return err + } + regionErr, err := resp.GetRegionError() + if err != nil { + return err + } + if regionErr != nil { + return errors.Errorf("requested health feedback from store but received region error: %s", regionErr.String()) + } + return nil +} + // NewKVStore creates a new TiKV store instance. func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Client, opt ...Option) (*KVStore, error) { o, err := oracles.NewPdOracle(pdClient, defaultOracleUpdateInterval) @@ -243,7 +260,9 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Cl return nil, err } ctx, cancel := context.WithCancel(context.Background()) - regionCache := locate.NewRegionCache(pdClient) + regionCache := locate.NewRegionCache(pdClient, locate.WithRequestHealthFeedback(func(ctx context.Context, addr string) error { + return requestHealthFeedbackFromKVClient(ctx, addr, tikvclient) + })) store := &KVStore{ clusterID: pdClient.GetClusterID(context.TODO()), uuid: uuid, diff --git a/tikvrpc/tikvrpc.go b/tikvrpc/tikvrpc.go index 50821516e..89de0027c 100644 --- a/tikvrpc/tikvrpc.go +++ b/tikvrpc/tikvrpc.go @@ -99,6 +99,8 @@ const ( CmdStoreSafeTS CmdLockWaitInfo + CmdRequestHealthFeedback + CmdCop CmdType = 512 + iota CmdCopStream CmdBatchCop @@ -217,6 +219,8 @@ func (t CmdType) String() string { return "StoreSafeTS" case CmdLockWaitInfo: return "LockWaitInfo" + case CmdRequestHealthFeedback: + return "RequestHealthFeedback" case CmdFlashbackToVersion: return "FlashbackToVersion" case CmdPrepareFlashbackToVersion: @@ -559,6 +563,11 @@ func (req *Request) LockWaitInfo() *kvrpcpb.GetLockWaitInfoRequest { return req.Req.(*kvrpcpb.GetLockWaitInfoRequest) } +// RequestHealthFeedback returns RequestHealthFeedbackRequest in request. +func (req *Request) RequestHealthFeedback() *kvrpcpb.RequestHealthFeedbackRequest { + return req.Req.(*kvrpcpb.RequestHealthFeedbackRequest) +} + // FlashbackToVersion returns FlashbackToVersionRequest in request. func (req *Request) FlashbackToVersion() *kvrpcpb.FlashbackToVersionRequest { return req.Req.(*kvrpcpb.FlashbackToVersionRequest) @@ -642,6 +651,8 @@ func (req *Request) ToBatchCommandsRequest() *tikvpb.BatchCommandsRequest_Reques return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_Flush{Flush: req.Flush()}} case CmdBufferBatchGet: return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_BufferBatchGet{BufferBatchGet: req.BufferBatchGet()}} + case CmdRequestHealthFeedback: + return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_RequestHealthFeedback{RequestHealthFeedback: req.RequestHealthFeedback()}} } return nil } @@ -717,6 +728,8 @@ func FromBatchCommandsResponse(res *tikvpb.BatchCommandsResponse_Response) (*Res return &Response{Resp: res.Flush}, nil case *tikvpb.BatchCommandsResponse_Response_BufferBatchGet: return &Response{Resp: res.BufferBatchGet}, nil + case *tikvpb.BatchCommandsResponse_Response_RequestHealthFeedback: + return &Response{Resp: res.RequestHealthFeedback}, nil } panic("unreachable") } @@ -950,6 +963,10 @@ func GenRegionErrorResp(req *Request, e *errorpb.Error) (*Response, error) { p = &kvrpcpb.BufferBatchGetResponse{ RegionError: e, } + case CmdRequestHealthFeedback: + p = &kvrpcpb.RequestHealthFeedbackResponse{ + RegionError: e, + } default: return nil, errors.Errorf("invalid request type %v", req.Type) } @@ -1124,6 +1141,8 @@ func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Resp resp.Resp, err = client.KvFlush(ctx, req.Flush()) case CmdBufferBatchGet: resp.Resp, err = client.KvBufferBatchGet(ctx, req.BufferBatchGet()) + case CmdRequestHealthFeedback: + return nil, errors.New("RequestHealthFeedback can only work in batch rpc mode") default: return nil, errors.Errorf("invalid request type: %v", req.Type) }