diff --git a/go.mod b/go.mod index 9a729123b..625fd3641 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 - github.com/pingcap/kvproto v0.0.0-20240227073058-929ab83f9754 + github.com/pingcap/kvproto v0.0.0-20240513094934-d9297553c900 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.18.0 diff --git a/go.sum b/go.sum index 5d3571994..9cc6628ef 100644 --- a/go.sum +++ b/go.sum @@ -74,8 +74,8 @@ github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgW github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= -github.com/pingcap/kvproto v0.0.0-20240227073058-929ab83f9754 h1:nU9wDeMsID8EWawRQVdmRYcNhUrlI4TKogZhXleG4QQ= -github.com/pingcap/kvproto v0.0.0-20240227073058-929ab83f9754/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= +github.com/pingcap/kvproto v0.0.0-20240513094934-d9297553c900 h1:snIM8DC846ufdlRclITACXfr1kvVIPU4cuQ6w3JVVY4= +github.com/pingcap/kvproto v0.0.0-20240513094934-d9297553c900/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/integration_tests/go.mod b/integration_tests/go.mod index d2c9be059..28f70db1a 100644 --- a/integration_tests/go.mod +++ b/integration_tests/go.mod @@ -6,7 +6,7 @@ require ( github.com/ninedraft/israce v0.0.3 github.com/pingcap/errors v0.11.5-0.20240318064555-6bd07397691f github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c - github.com/pingcap/kvproto v0.0.0-20240227073058-929ab83f9754 + github.com/pingcap/kvproto v0.0.0-20240513094934-d9297553c900 github.com/pingcap/tidb v1.1.0-beta.0.20240430081142-7481aa6d0b8b github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.9.0 diff --git a/integration_tests/go.sum b/integration_tests/go.sum index 1a87af587..90c5cec06 100644 --- a/integration_tests/go.sum +++ b/integration_tests/go.sum @@ -337,8 +337,8 @@ github.com/pingcap/fn v1.0.0/go.mod h1:u9WZ1ZiOD1RpNhcI42RucFh/lBuzTu6rw88a+oF2Z github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20240227073058-929ab83f9754 h1:nU9wDeMsID8EWawRQVdmRYcNhUrlI4TKogZhXleG4QQ= -github.com/pingcap/kvproto v0.0.0-20240227073058-929ab83f9754/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= +github.com/pingcap/kvproto v0.0.0-20240513094934-d9297553c900 h1:snIM8DC846ufdlRclITACXfr1kvVIPU4cuQ6w3JVVY4= +github.com/pingcap/kvproto v0.0.0-20240513094934-d9297553c900/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/log v1.1.1-0.20240314023424-862ccc32f18d h1:y3EueKVfVykdpTyfUnQGqft0ud+xVFuCdp1XkVL0X1E= diff --git a/integration_tests/health_feedback_test.go b/integration_tests/health_feedback_test.go new file mode 100644 index 000000000..286c9a5f4 --- /dev/null +++ b/integration_tests/health_feedback_test.go @@ -0,0 +1,62 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tikv_test + +import ( + "context" + "testing" + "time" + + "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/tikvrpc" +) + +func TestGetHealthFeedback(t *testing.T) { + if !*withTiKV { + return + } + + tikvCluster := NewTestStore(t) + defer tikvCluster.Close() + + // Find any TiKV node + store := tikvCluster.GetRegionCache().GetAllStores()[0] + for _, s := range tikvCluster.GetRegionCache().GetAllStores() { + if s.StoreType() == tikvrpc.TiKV { + store = s + } + } + require.NotNil(t, store) + + client := tikvCluster.GetTiKVClient() + ctx := context.Background() + + for i := 0; i < 3; i++ { + // In normal cases TiKV's slow score should be stable with value 1. Set it to any unstable value and check again + // to ensure the value is indeed received from TiKV. + store.GetHealthStatus().ResetTiKVServerSideSlowScoreForTest(50) + + resp, err := client.SendRequest(ctx, store.GetAddr(), tikvrpc.NewRequest(tikvrpc.CmdGetHealthFeedback, &kvrpcpb.GetHealthFeedbackRequest{}), time.Second) + require.NoError(t, err) + getHealthFeedbackResp := resp.Resp.(*kvrpcpb.GetHealthFeedbackResponse) + require.NotNil(t, getHealthFeedbackResp) + require.NotEqual(t, uint64(0), getHealthFeedbackResp.GetHealthFeedback().GetFeedbackSeqNo()) + require.Equal(t, int32(1), getHealthFeedbackResp.GetHealthFeedback().GetSlowScore()) + require.Equal(t, store.StoreID(), getHealthFeedbackResp.GetHealthFeedback().GetStoreId()) + // Updated in batch RPC stream. + require.Equal(t, int64(1), store.GetHealthStatus().GetHealthStatusDetail().TiKVSideSlowScore) + } +} diff --git a/internal/client/client.go b/internal/client/client.go index 369f0a143..8772cb159 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -51,6 +51,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/debugpb" + "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/mpp" "github.com/pingcap/kvproto/pkg/tikvpb" "github.com/pkg/errors" @@ -110,7 +111,7 @@ type Client interface { // ClientEventListener is a listener to handle events produced by `Client`. type ClientEventListener interface { // OnHealthFeedback is called when `Client` receives a response that carries the HealthFeedback information. - OnHealthFeedback(feedback *tikvpb.HealthFeedback) + OnHealthFeedback(feedback *kvrpcpb.HealthFeedback) } // ClientExt is a client has extended interfaces. diff --git a/internal/client/client_batch.go b/internal/client/client_batch.go index c54583945..cfbf88cc5 100644 --- a/internal/client/client_batch.go +++ b/internal/client/client_batch.go @@ -44,6 +44,7 @@ import ( "sync/atomic" "time" + "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/tikvpb" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -783,7 +784,7 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransport } } -func (c *batchCommandsClient) onHealthFeedback(feedback *tikvpb.HealthFeedback) { +func (c *batchCommandsClient) onHealthFeedback(feedback *kvrpcpb.HealthFeedback) { if h := c.eventListener.Load(); h != nil { (*h).OnHealthFeedback(feedback) } diff --git a/internal/client/client_test.go b/internal/client/client_test.go index 1491c09f7..0ee82bb23 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -823,16 +823,16 @@ func TestPrioritySentLimit(t *testing.T) { } type testClientEventListener struct { - healthFeedbackCh chan *tikvpb.HealthFeedback + healthFeedbackCh chan *kvrpcpb.HealthFeedback } func newTestClientEventListener() *testClientEventListener { return &testClientEventListener{ - healthFeedbackCh: make(chan *tikvpb.HealthFeedback, 100), + healthFeedbackCh: make(chan *kvrpcpb.HealthFeedback, 100), } } -func (l *testClientEventListener) OnHealthFeedback(feedback *tikvpb.HealthFeedback) { +func (l *testClientEventListener) OnHealthFeedback(feedback *kvrpcpb.HealthFeedback) { l.healthFeedbackCh <- feedback } diff --git a/internal/client/mockserver/mock_tikv_service.go b/internal/client/mockserver/mock_tikv_service.go index e0964f735..2479427bf 100644 --- a/internal/client/mockserver/mock_tikv_service.go +++ b/internal/client/mockserver/mock_tikv_service.go @@ -99,7 +99,7 @@ func (s *MockServer) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error { err = ss.Send(&tikvpb.BatchCommandsResponse{ Responses: responses, RequestIds: req.GetRequestIds(), - HealthFeedback: &tikvpb.HealthFeedback{ + HealthFeedback: &kvrpcpb.HealthFeedback{ StoreId: 1, FeedbackSeqNo: feedbackSeq, SlowScore: 1, diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index b74c10af5..0c7d97305 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -55,7 +55,6 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/kvproto/pkg/tikvpb" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/tikv/client-go/v2/config" @@ -631,6 +630,8 @@ type RegionCache struct { codec apicodec.Codec enableForwarding bool + requestHealthFeedbackCallback func(ctx context.Context, addr string) error + mu regionIndexMu stores storeCache @@ -642,7 +643,8 @@ type RegionCache struct { } type regionCacheOptions struct { - noHealthTick bool + noHealthTick bool + requestHealthFeedbackCallback func(ctx context.Context, addr string) error } type RegionCacheOpt func(*regionCacheOptions) @@ -651,6 +653,12 @@ func RegionCacheNoHealthTick(o *regionCacheOptions) { o.noHealthTick = true } +func WithRequestHealthFeedbackCallback(callback func(ctx context.Context, addr string) error) RegionCacheOpt { + return func(options *regionCacheOptions) { + options.requestHealthFeedbackCallback = callback + } +} + // NewRegionCache creates a RegionCache. func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache { var options regionCacheOptions @@ -659,7 +667,8 @@ func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache { } c := &RegionCache{ - pdClient: pdClient, + pdClient: pdClient, + requestHealthFeedbackCallback: options.requestHealthFeedbackCallback, } c.codec = apicodec.NewCodecV1(apicodec.ModeRaw) @@ -704,7 +713,7 @@ func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache { return false }, time.Duration(refreshStoreInterval/4)*time.Second, c.stores.getCheckStoreEvents()) if !options.noHealthTick { - c.bg.schedule(repeat(c.checkAndUpdateStoreHealthStatus), time.Duration(refreshStoreInterval/4)*time.Second) + c.bg.schedule(c.checkAndUpdateStoreHealthStatus, time.Duration(refreshStoreInterval/4)*time.Second) } c.bg.schedule(repeat(c.reportStoreReplicaFlows), time.Duration(refreshStoreInterval/2)*time.Second) if refreshCacheInterval := config.GetGlobalConfig().RegionsRefreshInterval; refreshCacheInterval > 0 { @@ -2618,7 +2627,7 @@ func (r *Region) ContainsByEnd(key []byte) bool { } // checkAndUpdateStoreHealthStatus checks and updates health stats on each store. -func (c *RegionCache) checkAndUpdateStoreHealthStatus() { +func (c *RegionCache) checkAndUpdateStoreHealthStatus(ctx context.Context, now time.Time) bool { defer func() { r := recover() if r != nil { @@ -2630,16 +2639,18 @@ func (c *RegionCache) checkAndUpdateStoreHealthStatus() { } } }() - healthDetails := make(map[uint64]HealthStatusDetail) - now := time.Now() + var stores []*Store c.stores.forEach(func(store *Store) { - store.healthStatus.tick(now) - healthDetails[store.storeID] = store.healthStatus.GetHealthStatusDetail() + stores = append(stores, store) }) - for store, details := range healthDetails { - metrics.TiKVStoreSlowScoreGauge.WithLabelValues(strconv.FormatUint(store, 10)).Set(float64(details.ClientSideSlowScore)) - metrics.TiKVFeedbackSlowScoreGauge.WithLabelValues(strconv.FormatUint(store, 10)).Set(float64(details.TiKVSideSlowScore)) + for _, store := range stores { + store.healthStatus.tick(ctx, now, store, c.requestHealthFeedbackCallback) + healthDetails := store.healthStatus.GetHealthStatusDetail() + metrics.TiKVStoreSlowScoreGauge.WithLabelValues(strconv.FormatUint(store.storeID, 10)).Set(float64(healthDetails.ClientSideSlowScore)) + metrics.TiKVFeedbackSlowScoreGauge.WithLabelValues(strconv.FormatUint(store.storeID, 10)).Set(float64(healthDetails.TiKVSideSlowScore)) } + + return false } // reportStoreReplicaFlows reports the statistics on the related replicaFlowsType. @@ -2662,7 +2673,7 @@ func contains(startKey, endKey, key []byte) bool { (bytes.Compare(key, endKey) < 0 || len(endKey) == 0) } -func (c *RegionCache) onHealthFeedback(feedback *tikvpb.HealthFeedback) { +func (c *RegionCache) onHealthFeedback(feedback *kvrpcpb.HealthFeedback) { store, ok := c.stores.get(feedback.GetStoreId()) if !ok { logutil.BgLogger().Info("dropped health feedback info due to unknown store id", zap.Uint64("storeID", feedback.GetStoreId())) @@ -2683,6 +2694,6 @@ type regionCacheClientEventListener struct { } // OnHealthFeedback implements the `client.ClientEventListener` interface. -func (l *regionCacheClientEventListener) OnHealthFeedback(feedback *tikvpb.HealthFeedback) { +func (l *regionCacheClientEventListener) OnHealthFeedback(feedback *kvrpcpb.HealthFeedback) { l.c.onHealthFeedback(feedback) } diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index 544302cae..4cd35b9de 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -49,14 +49,15 @@ import ( "github.com/gogo/protobuf/proto" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/errorpb" + "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/kvproto/pkg/tikvpb" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/tikv/client-go/v2/config/retry" "github.com/tikv/client-go/v2/internal/apicodec" "github.com/tikv/client-go/v2/internal/mockstore/mocktikv" "github.com/tikv/client-go/v2/kv" + "github.com/tikv/client-go/v2/tikvrpc" pd "github.com/tikv/pd/client" uatomic "go.uber.org/atomic" ) @@ -2088,10 +2089,14 @@ func (s *testRegionCacheSuite) TestHealthCheckWithStoreReplace() { } func (s *testRegionCacheSuite) TestTiKVSideSlowScore() { + store := newStore(1, "", "", "", tikvrpc.TiKV, resolved, nil) + store.livenessState = uint32(reachable) + ctx := context.Background() + stats := newStoreHealthStatus(1) s.LessOrEqual(stats.GetHealthStatusDetail().TiKVSideSlowScore, int64(1)) now := time.Now() - stats.tick(now) + stats.tick(ctx, now, store, nil) s.LessOrEqual(stats.GetHealthStatusDetail().TiKVSideSlowScore, int64(1)) s.False(stats.tikvSideSlowScore.hasTiKVFeedback.Load()) s.False(stats.IsSlow()) @@ -2108,22 +2113,26 @@ func (s *testRegionCacheSuite) TestTiKVSideSlowScore() { s.True(stats.IsSlow()) now = now.Add(time.Minute * 2) - stats.tick(now) + stats.tick(ctx, now, store, nil) s.Equal(int64(60), stats.GetHealthStatusDetail().TiKVSideSlowScore) s.False(stats.IsSlow()) now = now.Add(time.Minute * 3) - stats.tick(now) + stats.tick(ctx, now, store, nil) s.Equal(int64(1), stats.GetHealthStatusDetail().TiKVSideSlowScore) s.False(stats.IsSlow()) now = now.Add(time.Minute) - stats.tick(now) + stats.tick(ctx, now, store, nil) s.Equal(int64(1), stats.GetHealthStatusDetail().TiKVSideSlowScore) s.False(stats.IsSlow()) } func (s *testRegionCacheSuite) TestStoreHealthStatus() { + store := newStore(1, "", "", "", tikvrpc.TiKV, resolved, nil) + store.livenessState = uint32(reachable) + ctx := context.Background() + stats := newStoreHealthStatus(1) now := time.Now() s.False(stats.IsSlow()) @@ -2131,7 +2140,7 @@ func (s *testRegionCacheSuite) TestStoreHealthStatus() { for !stats.clientSideSlowScore.isSlow() { stats.clientSideSlowScore.recordSlowScoreStat(time.Minute) } - stats.tick(now) + stats.tick(ctx, now, store, nil) s.True(stats.IsSlow()) s.Equal(int64(stats.clientSideSlowScore.getSlowScore()), stats.GetHealthStatusDetail().ClientSideSlowScore) @@ -2142,7 +2151,7 @@ func (s *testRegionCacheSuite) TestStoreHealthStatus() { for stats.clientSideSlowScore.isSlow() { stats.clientSideSlowScore.recordSlowScoreStat(time.Millisecond) - stats.tick(now) + stats.tick(ctx, now, store, nil) } s.True(stats.IsSlow()) s.Equal(int64(stats.clientSideSlowScore.getSlowScore()), stats.GetHealthStatusDetail().ClientSideSlowScore) @@ -2160,7 +2169,7 @@ func (s *testRegionCacheSuite) TestRegionCacheHandleHealthStatus() { s.True(exists) s.False(store1.healthStatus.IsSlow()) - feedbackMsg := &tikvpb.HealthFeedback{ + feedbackMsg := &kvrpcpb.HealthFeedback{ StoreId: s.store1, FeedbackSeqNo: 1, SlowScore: 100, @@ -2169,7 +2178,7 @@ func (s *testRegionCacheSuite) TestRegionCacheHandleHealthStatus() { s.True(store1.healthStatus.IsSlow()) s.Equal(int64(100), store1.healthStatus.GetHealthStatusDetail().TiKVSideSlowScore) - feedbackMsg = &tikvpb.HealthFeedback{ + feedbackMsg = &kvrpcpb.HealthFeedback{ StoreId: s.store1, FeedbackSeqNo: 2, SlowScore: 90, @@ -2178,7 +2187,7 @@ func (s *testRegionCacheSuite) TestRegionCacheHandleHealthStatus() { s.cache.onHealthFeedback(feedbackMsg) s.Equal(int64(100), store1.healthStatus.GetHealthStatusDetail().TiKVSideSlowScore) - feedbackMsg = &tikvpb.HealthFeedback{ + feedbackMsg = &kvrpcpb.HealthFeedback{ StoreId: s.store1, FeedbackSeqNo: 3, SlowScore: 90, @@ -2187,7 +2196,7 @@ func (s *testRegionCacheSuite) TestRegionCacheHandleHealthStatus() { s.cache.onHealthFeedback(feedbackMsg) s.Equal(int64(90), store1.healthStatus.GetHealthStatusDetail().TiKVSideSlowScore) - feedbackMsg = &tikvpb.HealthFeedback{ + feedbackMsg = &kvrpcpb.HealthFeedback{ StoreId: s.store1, FeedbackSeqNo: 4, SlowScore: 50, diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index 230bb9a51..83f3a9883 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -546,6 +546,10 @@ func (s *mockTikvGrpcServer) KvBufferBatchGet(context.Context, *kvrpcpb.BufferBa return nil, errors.New("unreachable") } +func (s *mockTikvGrpcServer) GetHealthFeedback(ctx context.Context, request *kvrpcpb.GetHealthFeedbackRequest) (*kvrpcpb.GetHealthFeedbackResponse, error) { + return nil, errors.New("unreachable") +} + func (s *testRegionRequestToSingleStoreSuite) TestNoReloadRegionForGrpcWhenCtxCanceled() { // prepare a mock tikv grpc server addr := "localhost:56341" diff --git a/internal/locate/replica_selector_test.go b/internal/locate/replica_selector_test.go index 1de2e43a5..a59cd9686 100644 --- a/internal/locate/replica_selector_test.go +++ b/internal/locate/replica_selector_test.go @@ -3101,7 +3101,7 @@ func (s *testReplicaSelectorSuite) resetStoreState() { for _, store := range rc.getStore().stores { store.loadStats.Store(nil) store.healthStatus.clientSideSlowScore.resetSlowScore() - store.healthStatus.resetTiKVServerSideSlowScoreForTest() + store.healthStatus.ResetTiKVServerSideSlowScoreForTest(1) store.healthStatus.updateSlowFlag() atomic.StoreUint32(&store.livenessState, uint32(reachable)) store.setResolveState(resolved) diff --git a/internal/locate/store_cache.go b/internal/locate/store_cache.go index 9d642e8da..022c64db3 100644 --- a/internal/locate/store_cache.go +++ b/internal/locate/store_cache.go @@ -18,13 +18,14 @@ import ( "context" "fmt" "math" + "strconv" "strings" "sync" "sync/atomic" "time" + "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/kvproto/pkg/tikvpb" "github.com/pkg/errors" "github.com/tikv/client-go/v2/config" "github.com/tikv/client-go/v2/config/retry" @@ -260,6 +261,11 @@ func newUninitializedStore(id uint64) *Store { } } +// StoreType returns the type of the store. +func (s *Store) StoreType() tikvrpc.EndpointType { + return s.storeType +} + // IsTiFlash returns true if the storeType is TiFlash func (s *Store) IsTiFlash() bool { return s.storeType == tikvrpc.TiFlash @@ -324,6 +330,11 @@ func (s *Store) IsLabelsMatch(labels []*metapb.StoreLabel) bool { return true } +// GetHealthStatus returns the health status of the store. This is exported for test purpose. +func (s *Store) GetHealthStatus() *StoreHealthStatus { + return s.healthStatus +} + func isStoreContainLabel(labels []*metapb.StoreLabel, key string, val string) (res bool) { for _, label := range labels { if label.GetKey() == key && label.GetValue() == val { @@ -822,7 +833,7 @@ const ( tikvSlowScoreSlowThreshold int64 = 80 tikvSlowScoreUpdateInterval = time.Millisecond * 100 - tikvSlowScoreUpdateFromPDInterval = time.Minute + tikvSlowScoreActiveUpdateInterval = time.Second * 15 ) type StoreHealthStatus struct { @@ -880,9 +891,10 @@ func (s *StoreHealthStatus) GetHealthStatusDetail() HealthStatusDetail { // tick updates the health status that changes over time, such as slow score's decaying, etc. This function is expected // to be called periodically. -func (s *StoreHealthStatus) tick(now time.Time) { +func (s *StoreHealthStatus) tick(ctx context.Context, now time.Time, store *Store, requestHealthFeedbackCallback func(ctx context.Context, addr string) error) { + metrics.TiKVHealthFeedbackOpsCounter.WithLabelValues(strconv.FormatUint(store.StoreID(), 10), "tick").Inc() s.clientSideSlowScore.updateSlowScore() - s.updateTiKVServerSideSlowScoreOnTick(now) + s.updateTiKVServerSideSlowScoreOnTick(ctx, now, store, requestHealthFeedbackCallback) s.updateSlowFlag() } @@ -900,17 +912,55 @@ func (s *StoreHealthStatus) markAlreadySlow() { // updateTiKVServerSideSlowScoreOnTick updates the slow score actively, which is expected to be a periodic job. // It skips updating if the last update time didn't elapse long enough, or it's being updated concurrently. -func (s *StoreHealthStatus) updateTiKVServerSideSlowScoreOnTick(now time.Time) { +func (s *StoreHealthStatus) updateTiKVServerSideSlowScoreOnTick(ctx context.Context, now time.Time, store *Store, requestHealthFeedbackCallback func(ctx context.Context, addr string) error) { if !s.tikvSideSlowScore.hasTiKVFeedback.Load() { // Do nothing if no feedback has been received from this store yet. return } - lastUpdateTime := s.tikvSideSlowScore.lastUpdateTime.Load() - if lastUpdateTime == nil || now.Sub(*lastUpdateTime) < tikvSlowScoreUpdateFromPDInterval { - // If the first feedback is + + // Skip tick if the store's slow score is 1, as it's likely to be a normal case that a health store is not being + // accessed. + if s.tikvSideSlowScore.score.Load() <= 1 { + return + } + + needRefreshing := func() bool { + lastUpdateTime := s.tikvSideSlowScore.lastUpdateTime.Load() + if lastUpdateTime == nil { + // If the first hasn't been received yet, assume the store doesn't support feeding back and skip the tick. + return false + } + + return now.Sub(*lastUpdateTime) >= tikvSlowScoreActiveUpdateInterval + } + + if !needRefreshing() { return } + // If not updated for too long, try to explicitly fetch it from TiKV. + // Note that this can't be done while holding the mutex, because the updating is done by the client when receiving + // the response (in the same way as handling the feedback information pushed from TiKV), which needs acquiring the + // mutex. + if requestHealthFeedbackCallback != nil && store.getLivenessState() == reachable { + addr := store.GetAddr() + if len(addr) == 0 { + logutil.Logger(ctx).Warn("skip actively request health feedback info from store due to unknown addr", zap.Uint64("storeID", store.StoreID())) + } else { + metrics.TiKVHealthFeedbackOpsCounter.WithLabelValues(strconv.FormatUint(store.StoreID(), 10), "active_update").Inc() + err := requestHealthFeedbackCallback(ctx, store.GetAddr()) + if err != nil { + metrics.TiKVHealthFeedbackOpsCounter.WithLabelValues(strconv.FormatUint(store.StoreID(), 10), "active_update_err").Inc() + logutil.Logger(ctx).Warn("actively request health feedback info from store got error", zap.Uint64("storeID", store.StoreID()), zap.Error(err)) + } + } + + // Continue if active updating is unsuccessful. + if !needRefreshing() { + return + } + } + if !s.tikvSideSlowScore.TryLock() { // It must be being updated concurrently. return @@ -918,16 +968,13 @@ func (s *StoreHealthStatus) updateTiKVServerSideSlowScoreOnTick(now time.Time) { defer s.tikvSideSlowScore.Unlock() // Reload update time as it might be updated concurrently before acquiring mutex - lastUpdateTime = s.tikvSideSlowScore.lastUpdateTime.Load() + lastUpdateTime := s.tikvSideSlowScore.lastUpdateTime.Load() elapsed := now.Sub(*lastUpdateTime) - if elapsed < tikvSlowScoreUpdateFromPDInterval { + if elapsed < tikvSlowScoreActiveUpdateInterval { return } - // TODO: Try to get store status from PD here. But it's not mandatory. - // Don't forget to update tests if getting slow score from PD is implemented here. - - // If updating from PD is not successful: decay the slow score. + // If requesting from TiKV is not successful: decay the slow score. score := s.tikvSideSlowScore.score.Load() if score < 1 { return @@ -949,6 +996,20 @@ func (s *StoreHealthStatus) updateTiKVServerSideSlowScore(score int64, currTime lastScore := s.tikvSideSlowScore.score.Load() if lastScore == score { + // It's still needed to update the lastUpdateTime to tell whether the slow score is not being updated for too + // long (so that it's needed to explicitly get the slow score). + // from TiKV. + // But it can be safely skipped if the score is 1 (as explicit getting slow score won't be performed in this + // case). And note that it should be updated within mutex. + if score > 1 { + // Skip if not locked as it's being updated concurrently. + if s.tikvSideSlowScore.TryLock() { + newUpdateTime := new(time.Time) + *newUpdateTime = currTime + s.tikvSideSlowScore.lastUpdateTime.Store(newUpdateTime) + s.tikvSideSlowScore.Unlock() + } + } return } @@ -980,9 +1041,12 @@ func (s *StoreHealthStatus) updateTiKVServerSideSlowScore(score int64, currTime s.tikvSideSlowScore.lastUpdateTime.Store(newUpdateTime) } -func (s *StoreHealthStatus) resetTiKVServerSideSlowScoreForTest() { +// ResetTiKVServerSideSlowScoreForTest resets the TiKV-side slow score information and make it expired so that the +// next update can be effective. A new score should be passed to the function. For a store that's running normally +// without any sign of being slow, the value should be 1. +func (s *StoreHealthStatus) ResetTiKVServerSideSlowScoreForTest(score int64) { s.setTiKVSlowScoreLastUpdateTimeForTest(time.Now().Add(-time.Hour * 2)) - s.updateTiKVServerSideSlowScore(1, time.Now().Add(-time.Hour)) + s.updateTiKVServerSideSlowScore(score, time.Now().Add(-time.Hour)) } func (s *StoreHealthStatus) updateSlowFlag() { @@ -1002,7 +1066,7 @@ func (s *StoreHealthStatus) setTiKVSlowScoreLastUpdateTimeForTest(lastUpdateTime s.tikvSideSlowScore.lastUpdateTime.Store(&lastUpdateTime) } -func (s *Store) recordHealthFeedback(feedback *tikvpb.HealthFeedback) { +func (s *Store) recordHealthFeedback(feedback *kvrpcpb.HealthFeedback) { // Note that the `FeedbackSeqNo` field of `HealthFeedback` is not used yet. It's a monotonic value that can help // to drop out-of-order feedback messages. But it's not checked for now since it's not very necessary to receive // only a slow score. It's prepared for possible use in the future. diff --git a/metrics/metrics.go b/metrics/metrics.go index 6d6c00720..0cf74ea62 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -103,6 +103,7 @@ var ( TiKVAggressiveLockedKeysCounter *prometheus.CounterVec TiKVStoreSlowScoreGauge *prometheus.GaugeVec TiKVFeedbackSlowScoreGauge *prometheus.GaugeVec + TiKVHealthFeedbackOpsCounter *prometheus.CounterVec TiKVPreferLeaderFlowsGauge *prometheus.GaugeVec TiKVStaleReadCounter *prometheus.CounterVec TiKVStaleReadReqCounter *prometheus.CounterVec @@ -726,6 +727,15 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { ConstLabels: constLabels, }, []string{LblStore}) + TiKVHealthFeedbackOpsCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "health_feedback_ops_counter", + Help: "Counter of operations about TiKV health feedback", + ConstLabels: constLabels, + }, []string{LblScope, LblType}) + TiKVPreferLeaderFlowsGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: namespace, @@ -868,6 +878,7 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVAggressiveLockedKeysCounter) prometheus.MustRegister(TiKVStoreSlowScoreGauge) prometheus.MustRegister(TiKVFeedbackSlowScoreGauge) + prometheus.MustRegister(TiKVHealthFeedbackOpsCounter) prometheus.MustRegister(TiKVPreferLeaderFlowsGauge) prometheus.MustRegister(TiKVStaleReadCounter) prometheus.MustRegister(TiKVStaleReadReqCounter) diff --git a/tikv/kv.go b/tikv/kv.go index aea5c28d7..1afedc294 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -235,6 +235,28 @@ func loadOption(store *KVStore, opt ...Option) { } } +const getHealthFeedbackTimeout = time.Second * 2 + +func requestHealthFeedbackFromKVClient(ctx context.Context, addr string, tikvClient Client) error { + // When batch RPC is enabled (`MaxBatchSize` > 0), a `GetHealthFeedback` RPC call will cause TiKV also sending the + // health feedback information in via the `BatchCommandsResponse`, which will be handled by the batch client. + // Therefore the same information carried in the response don't need to be handled in this case. And as we're + // currently not supporting health feedback mechanism without enabling batch RPC, we do not use the information + // carried in the `resp` here. + resp, err := tikvClient.SendRequest(ctx, addr, tikvrpc.NewRequest(tikvrpc.CmdGetHealthFeedback, &kvrpcpb.GetHealthFeedbackRequest{}), getHealthFeedbackTimeout) + if err != nil { + return err + } + regionErr, err := resp.GetRegionError() + if err != nil { + return err + } + if regionErr != nil { + return errors.Errorf("requested health feedback from store but received region error: %s", regionErr.String()) + } + return nil +} + // NewKVStore creates a new TiKV store instance. func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Client, opt ...Option) (*KVStore, error) { o, err := oracles.NewPdOracle(pdClient, defaultOracleUpdateInterval) @@ -242,7 +264,9 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Cl return nil, err } ctx, cancel := context.WithCancel(context.Background()) - regionCache := locate.NewRegionCache(pdClient) + regionCache := locate.NewRegionCache(pdClient, locate.WithRequestHealthFeedbackCallback(func(ctx context.Context, addr string) error { + return requestHealthFeedbackFromKVClient(ctx, addr, tikvclient) + })) store := &KVStore{ clusterID: pdClient.GetClusterID(context.TODO()), uuid: uuid, diff --git a/tikvrpc/tikvrpc.go b/tikvrpc/tikvrpc.go index 50821516e..350c5b7c7 100644 --- a/tikvrpc/tikvrpc.go +++ b/tikvrpc/tikvrpc.go @@ -99,6 +99,8 @@ const ( CmdStoreSafeTS CmdLockWaitInfo + CmdGetHealthFeedback + CmdCop CmdType = 512 + iota CmdCopStream CmdBatchCop @@ -217,6 +219,8 @@ func (t CmdType) String() string { return "StoreSafeTS" case CmdLockWaitInfo: return "LockWaitInfo" + case CmdGetHealthFeedback: + return "GetHealthFeedback" case CmdFlashbackToVersion: return "FlashbackToVersion" case CmdPrepareFlashbackToVersion: @@ -559,6 +563,11 @@ func (req *Request) LockWaitInfo() *kvrpcpb.GetLockWaitInfoRequest { return req.Req.(*kvrpcpb.GetLockWaitInfoRequest) } +// GetHealthFeedback returns GetHealthFeedbackRequest in request. +func (req *Request) GetHealthFeedback() *kvrpcpb.GetHealthFeedbackRequest { + return req.Req.(*kvrpcpb.GetHealthFeedbackRequest) +} + // FlashbackToVersion returns FlashbackToVersionRequest in request. func (req *Request) FlashbackToVersion() *kvrpcpb.FlashbackToVersionRequest { return req.Req.(*kvrpcpb.FlashbackToVersionRequest) @@ -642,6 +651,8 @@ func (req *Request) ToBatchCommandsRequest() *tikvpb.BatchCommandsRequest_Reques return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_Flush{Flush: req.Flush()}} case CmdBufferBatchGet: return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_BufferBatchGet{BufferBatchGet: req.BufferBatchGet()}} + case CmdGetHealthFeedback: + return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_GetHealthFeedback{GetHealthFeedback: req.GetHealthFeedback()}} } return nil } @@ -717,6 +728,8 @@ func FromBatchCommandsResponse(res *tikvpb.BatchCommandsResponse_Response) (*Res return &Response{Resp: res.Flush}, nil case *tikvpb.BatchCommandsResponse_Response_BufferBatchGet: return &Response{Resp: res.BufferBatchGet}, nil + case *tikvpb.BatchCommandsResponse_Response_GetHealthFeedback: + return &Response{Resp: res.GetHealthFeedback}, nil } panic("unreachable") } @@ -950,6 +963,10 @@ func GenRegionErrorResp(req *Request, e *errorpb.Error) (*Response, error) { p = &kvrpcpb.BufferBatchGetResponse{ RegionError: e, } + case CmdGetHealthFeedback: + p = &kvrpcpb.GetHealthFeedbackResponse{ + RegionError: e, + } default: return nil, errors.Errorf("invalid request type %v", req.Type) } @@ -1124,6 +1141,8 @@ func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Resp resp.Resp, err = client.KvFlush(ctx, req.Flush()) case CmdBufferBatchGet: resp.Resp, err = client.KvBufferBatchGet(ctx, req.BufferBatchGet()) + case CmdGetHealthFeedback: + resp.Resp, err = client.GetHealthFeedback(ctx, req.GetHealthFeedback()) default: return nil, errors.Errorf("invalid request type: %v", req.Type) }