Skip to content

Commit

Permalink
Support actively requesting update health feedback information by cal…
Browse files Browse the repository at this point in the history
…ling RPC to TiKV

Signed-off-by: MyonKeminta <[email protected]>
  • Loading branch information
MyonKeminta committed Apr 11, 2024
1 parent 714958c commit 2fd1e93
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 33 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,5 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/pingcap/kvproto => github.com/MyonKeminta/kvproto v0.0.0-20240408102041-05784a0bc698
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/MyonKeminta/kvproto v0.0.0-20240408102041-05784a0bc698 h1:3eRDRr/Z6I4x62V6i1/pO/ZYVeqm5Sk2Rctr/2AUSNM=
github.com/MyonKeminta/kvproto v0.0.0-20240408102041-05784a0bc698/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
Expand Down Expand Up @@ -74,8 +76,6 @@ github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgW
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20240227073058-929ab83f9754 h1:nU9wDeMsID8EWawRQVdmRYcNhUrlI4TKogZhXleG4QQ=
github.com/pingcap/kvproto v0.0.0-20240227073058-929ab83f9754/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
34 changes: 23 additions & 11 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,8 @@ type RegionCache struct {
codec apicodec.Codec
enableForwarding bool

requestHealthFeedbackCallback func(ctx context.Context, addr string) error

mu regionIndexMu

storeMu struct {
Expand All @@ -656,7 +658,8 @@ type RegionCache struct {
}

type regionCacheOptions struct {
noHealthTick bool
noHealthTick bool
requestHealthFeedback func(ctx context.Context, addr string) error
}

type RegionCacheOpt func(*regionCacheOptions)
Expand All @@ -665,6 +668,12 @@ func RegionCacheNoHealthTick(o *regionCacheOptions) {
o.noHealthTick = true
}

func WithRequestHealthFeedback(callback func(ctx context.Context, addr string) error) RegionCacheOpt {
return func(options *regionCacheOptions) {
options.requestHealthFeedback = callback
}
}

// NewRegionCache creates a RegionCache.
func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache {
var options regionCacheOptions
Expand All @@ -673,7 +682,8 @@ func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache {
}

c := &RegionCache{
pdClient: pdClient,
pdClient: pdClient,
requestHealthFeedbackCallback: options.requestHealthFeedback,
}

c.codec = apicodec.NewCodecV1(apicodec.ModeRaw)
Expand Down Expand Up @@ -721,7 +731,7 @@ func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache {
return false
}, time.Duration(refreshStoreInterval/4)*time.Second, c.getCheckStoreEvents())
if !options.noHealthTick {
c.bg.schedule(repeat(c.checkAndUpdateStoreHealthStatus), time.Duration(refreshStoreInterval/4)*time.Second)
c.bg.schedule(c.checkAndUpdateStoreHealthStatus, time.Duration(refreshStoreInterval/4)*time.Second)
}
c.bg.schedule(repeat(c.reportStoreReplicaFlows), time.Duration(refreshStoreInterval/2)*time.Second)
if refreshCacheInterval := config.GetGlobalConfig().RegionsRefreshInterval; refreshCacheInterval > 0 {
Expand Down Expand Up @@ -2645,7 +2655,7 @@ func (r *Region) ContainsByEnd(key []byte) bool {
}

// checkAndUpdateStoreHealthStatus checks and updates health stats on each store.
func (c *RegionCache) checkAndUpdateStoreHealthStatus() {
func (c *RegionCache) checkAndUpdateStoreHealthStatus(ctx context.Context, now time.Time) bool {
defer func() {
r := recover()
if r != nil {
Expand All @@ -2657,16 +2667,18 @@ func (c *RegionCache) checkAndUpdateStoreHealthStatus() {
}
}
}()
healthDetails := make(map[uint64]HealthStatusDetail)
now := time.Now()
var stores []*Store
c.forEachStore(func(store *Store) {
store.healthStatus.tick(now)
healthDetails[store.storeID] = store.healthStatus.GetHealthStatusDetail()
stores = append(stores, store)
})
for store, details := range healthDetails {
metrics.TiKVStoreSlowScoreGauge.WithLabelValues(strconv.FormatUint(store, 10)).Set(float64(details.ClientSideSlowScore))
metrics.TiKVFeedbackSlowScoreGauge.WithLabelValues(strconv.FormatUint(store, 10)).Set(float64(details.TiKVSideSlowScore))
for _, store := range stores {
store.healthStatus.tick(ctx, now, store, c.requestHealthFeedbackCallback)
healthDetails := store.healthStatus.GetHealthStatusDetail()
metrics.TiKVStoreSlowScoreGauge.WithLabelValues(strconv.FormatUint(store.storeID, 10)).Set(float64(healthDetails.ClientSideSlowScore))
metrics.TiKVFeedbackSlowScoreGauge.WithLabelValues(strconv.FormatUint(store.storeID, 10)).Set(float64(healthDetails.TiKVSideSlowScore))
}

return false
}

// reportStoreReplicaFlows reports the statistics on the related replicaFlowsType.
Expand Down
21 changes: 15 additions & 6 deletions internal/locate/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
"github.com/tikv/client-go/v2/internal/apicodec"
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
"github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/tikvrpc"
pd "github.com/tikv/pd/client"
uatomic "go.uber.org/atomic"
)
Expand Down Expand Up @@ -2088,10 +2089,14 @@ func (s *testRegionCacheSuite) TestHealthCheckWithStoreReplace() {
}

func (s *testRegionCacheSuite) TestTiKVSideSlowScore() {
store := newStore(1, "", "", "", tikvrpc.TiKV, resolved, nil)
store.livenessState = uint32(reachable)
ctx := context.Background()

stats := newStoreHealthStatus(1)
s.LessOrEqual(stats.GetHealthStatusDetail().TiKVSideSlowScore, int64(1))
now := time.Now()
stats.tick(now)
stats.tick(ctx, now, store, nil)
s.LessOrEqual(stats.GetHealthStatusDetail().TiKVSideSlowScore, int64(1))
s.False(stats.tikvSideSlowScore.hasTiKVFeedback.Load())
s.False(stats.IsSlow())
Expand All @@ -2108,30 +2113,34 @@ func (s *testRegionCacheSuite) TestTiKVSideSlowScore() {
s.True(stats.IsSlow())

now = now.Add(time.Minute * 2)
stats.tick(now)
stats.tick(ctx, now, store, nil)
s.Equal(int64(60), stats.GetHealthStatusDetail().TiKVSideSlowScore)
s.False(stats.IsSlow())

now = now.Add(time.Minute * 3)
stats.tick(now)
stats.tick(ctx, now, store, nil)
s.Equal(int64(1), stats.GetHealthStatusDetail().TiKVSideSlowScore)
s.False(stats.IsSlow())

now = now.Add(time.Minute)
stats.tick(now)
stats.tick(ctx, now, store, nil)
s.Equal(int64(1), stats.GetHealthStatusDetail().TiKVSideSlowScore)
s.False(stats.IsSlow())
}

func (s *testRegionCacheSuite) TestStoreHealthStatus() {
store := newStore(1, "", "", "", tikvrpc.TiKV, resolved, nil)
store.livenessState = uint32(reachable)
ctx := context.Background()

stats := newStoreHealthStatus(1)
now := time.Now()
s.False(stats.IsSlow())

for !stats.clientSideSlowScore.isSlow() {
stats.clientSideSlowScore.recordSlowScoreStat(time.Minute)
}
stats.tick(now)
stats.tick(ctx, now, store, nil)
s.True(stats.IsSlow())
s.Equal(int64(stats.clientSideSlowScore.getSlowScore()), stats.GetHealthStatusDetail().ClientSideSlowScore)

Expand All @@ -2142,7 +2151,7 @@ func (s *testRegionCacheSuite) TestStoreHealthStatus() {

for stats.clientSideSlowScore.isSlow() {
stats.clientSideSlowScore.recordSlowScoreStat(time.Millisecond)
stats.tick(now)
stats.tick(ctx, now, store, nil)
}
s.True(stats.IsSlow())
s.Equal(int64(stats.clientSideSlowScore.getSlowScore()), stats.GetHealthStatusDetail().ClientSideSlowScore)
Expand Down
57 changes: 44 additions & 13 deletions internal/locate/store_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"math"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -790,7 +791,7 @@ const (
tikvSlowScoreSlowThreshold int64 = 80

tikvSlowScoreUpdateInterval = time.Millisecond * 100
tikvSlowScoreUpdateFromPDInterval = time.Minute
tikvSlowScoreActiveUpdateInterval = time.Second * 30
)

type StoreHealthStatus struct {
Expand Down Expand Up @@ -848,9 +849,10 @@ func (s *StoreHealthStatus) GetHealthStatusDetail() HealthStatusDetail {

// tick updates the health status that changes over time, such as slow score's decaying, etc. This function is expected
// to be called periodically.
func (s *StoreHealthStatus) tick(now time.Time) {
func (s *StoreHealthStatus) tick(ctx context.Context, now time.Time, store *Store, requestHealthFeedbackCallback func(ctx context.Context, addr string) error) {
metrics.TiKVHealthFeedbackOpsCounter.WithLabelValues(strconv.FormatUint(store.StoreID(), 10), "tick").Inc()
s.clientSideSlowScore.updateSlowScore()
s.updateTiKVServerSideSlowScoreOnTick(now)
s.updateTiKVServerSideSlowScoreOnTick(ctx, now, store, requestHealthFeedbackCallback)
s.updateSlowFlag()
}

Expand All @@ -868,34 +870,63 @@ func (s *StoreHealthStatus) markAlreadySlow() {

// updateTiKVServerSideSlowScoreOnTick updates the slow score actively, which is expected to be a periodic job.
// It skips updating if the last update time didn't elapse long enough, or it's being updated concurrently.
func (s *StoreHealthStatus) updateTiKVServerSideSlowScoreOnTick(now time.Time) {
func (s *StoreHealthStatus) updateTiKVServerSideSlowScoreOnTick(ctx context.Context, now time.Time, store *Store, requestHealthFeedbackCallback func(ctx context.Context, addr string) error) {
if !s.tikvSideSlowScore.hasTiKVFeedback.Load() {
// Do nothing if no feedback has been received from this store yet.
return
}
lastUpdateTime := s.tikvSideSlowScore.lastUpdateTime.Load()
if lastUpdateTime == nil || now.Sub(*lastUpdateTime) < tikvSlowScoreUpdateFromPDInterval {
// If the first feedback is

needRefreshing := func() bool {
lastUpdateTime := s.tikvSideSlowScore.lastUpdateTime.Load()
if lastUpdateTime == nil {
// If the first hasn't been received yet, assume the store doesn't support feeding back and skip the tick.
return false
}

return now.Sub(*lastUpdateTime) >= tikvSlowScoreActiveUpdateInterval
}

if !needRefreshing() {
return
}

// If not updated for too long, try to actively fetch it from TiKV.
// Note that this can't be done while holding the mutex, because the updating is done by the client when receiving
// the response (in the same way as handling the feedback information pushed from TiKV), which needs acquiring the
// mutex.
if requestHealthFeedbackCallback != nil && store.getLivenessState() == reachable {
addr := store.GetAddr()
if len(addr) == 0 {
logutil.Logger(ctx).Warn("skip actively request health feedback info from store due to unknown addr", zap.Uint64("storeID", store.StoreID()))
} else {
metrics.TiKVHealthFeedbackOpsCounter.WithLabelValues(strconv.FormatUint(store.StoreID(), 10), "active_update").Inc()
err := requestHealthFeedbackCallback(ctx, store.GetAddr())
if err != nil {
metrics.TiKVHealthFeedbackOpsCounter.WithLabelValues(strconv.FormatUint(store.StoreID(), 10), "active_update_err").Inc()
logutil.Logger(ctx).Warn("actively request health feedback info from store got error", zap.Uint64("storeID", store.StoreID()), zap.Error(err))
}
}

// Continue if active updating is unsuccessful.
if !needRefreshing() {
return
}
}

if !s.tikvSideSlowScore.TryLock() {
// It must be being updated concurrently.
return
}
defer s.tikvSideSlowScore.Unlock()

// Reload update time as it might be updated concurrently before acquiring mutex
lastUpdateTime = s.tikvSideSlowScore.lastUpdateTime.Load()
lastUpdateTime := s.tikvSideSlowScore.lastUpdateTime.Load()
elapsed := now.Sub(*lastUpdateTime)
if elapsed < tikvSlowScoreUpdateFromPDInterval {
if elapsed < tikvSlowScoreActiveUpdateInterval {
return
}

// TODO: Try to get store status from PD here. But it's not mandatory.
// Don't forget to update tests if getting slow score from PD is implemented here.

// If updating from PD is not successful: decay the slow score.
// If requesting from TiKV is not successful: decay the slow score.
score := s.tikvSideSlowScore.score.Load()
if score < 1 {
return
Expand Down
11 changes: 11 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ var (
TiKVAggressiveLockedKeysCounter *prometheus.CounterVec
TiKVStoreSlowScoreGauge *prometheus.GaugeVec
TiKVFeedbackSlowScoreGauge *prometheus.GaugeVec
TiKVHealthFeedbackOpsCounter *prometheus.CounterVec
TiKVPreferLeaderFlowsGauge *prometheus.GaugeVec
TiKVStaleReadCounter *prometheus.CounterVec
TiKVStaleReadReqCounter *prometheus.CounterVec
Expand Down Expand Up @@ -716,6 +717,15 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) {
ConstLabels: constLabels,
}, []string{LblStore})

TiKVHealthFeedbackOpsCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "health_feedback_ops_counter",
Help: "Counter of operations about TiKV health feedback",
ConstLabels: constLabels,
}, []string{LblScope, LblType})

TiKVPreferLeaderFlowsGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Expand Down Expand Up @@ -857,6 +867,7 @@ func RegisterMetrics() {
prometheus.MustRegister(TiKVAggressiveLockedKeysCounter)
prometheus.MustRegister(TiKVStoreSlowScoreGauge)
prometheus.MustRegister(TiKVFeedbackSlowScoreGauge)
prometheus.MustRegister(TiKVHealthFeedbackOpsCounter)
prometheus.MustRegister(TiKVPreferLeaderFlowsGauge)
prometheus.MustRegister(TiKVStaleReadCounter)
prometheus.MustRegister(TiKVStaleReadReqCounter)
Expand Down
21 changes: 20 additions & 1 deletion tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,14 +236,33 @@ func loadOption(store *KVStore, opt ...Option) {
}
}

const requestHealthFeedbackTimeout = time.Second * 2

func requestHealthFeedbackFromKVClient(ctx context.Context, addr string, tikvClient Client) error {
resp, err := tikvClient.SendRequest(ctx, addr, tikvrpc.NewRequest(tikvrpc.CmdRequestHealthFeedback, &kvrpcpb.RequestHealthFeedbackRequest{}), requestHealthFeedbackTimeout)
if err != nil {
return err
}
regionErr, err := resp.GetRegionError()
if err != nil {
return err
}
if regionErr != nil {
return errors.Errorf("requested health feedback from store but received region error: %s", regionErr.String())
}
return nil
}

// NewKVStore creates a new TiKV store instance.
func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Client, opt ...Option) (*KVStore, error) {
o, err := oracles.NewPdOracle(pdClient, defaultOracleUpdateInterval)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
regionCache := locate.NewRegionCache(pdClient)
regionCache := locate.NewRegionCache(pdClient, locate.WithRequestHealthFeedback(func(ctx context.Context, addr string) error {
return requestHealthFeedbackFromKVClient(ctx, addr, tikvclient)
}))
store := &KVStore{
clusterID: pdClient.GetClusterID(context.TODO()),
uuid: uuid,
Expand Down
Loading

0 comments on commit 2fd1e93

Please sign in to comment.