Skip to content

Commit

Permalink
statistics: add gc in hot peer cache (tikv#8702) (tikv#8750)
Browse files Browse the repository at this point in the history
close tikv#8698

Signed-off-by: lhy1024 <[email protected]>

Co-authored-by: lhy1024 <[email protected]>
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
ti-chi-bot and lhy1024 committed Jan 26, 2025
1 parent 3e55a4f commit 1779190
Show file tree
Hide file tree
Showing 10 changed files with 186 additions and 79 deletions.
5 changes: 3 additions & 2 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,11 @@ type Cluster struct {

// NewCluster creates a new Cluster
func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster {
basicCluster := core.NewBasicCluster()
clus := &Cluster{
BasicCluster: core.NewBasicCluster(),
BasicCluster: basicCluster,
IDAllocator: mockid.NewIDAllocator(),
HotStat: statistics.NewHotStat(ctx),
HotStat: statistics.NewHotStat(ctx, basicCluster),
HotBucketCache: buckets.NewBucketsCache(ctx),
PersistOptions: opts,
suspectRegions: map[uint64]struct{}{},
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (c *RaftCluster) InitCluster(
c.core, c.opt, c.storage, c.id = basicCluster, opt, storage, id
c.ctx, c.cancel = context.WithCancel(c.serverCtx)
c.labelLevelStats = statistics.NewLabelStatistics()
c.hotStat = statistics.NewHotStat(c.ctx)
c.hotStat = statistics.NewHotStat(c.ctx, basicCluster)
c.hotBuckets = buckets.NewBucketsCache(c.ctx)
c.progressManager = progress.NewManager()
c.changedRegions = make(chan *core.RegionInfo, defaultChangedRegionsLimit)
Expand Down
5 changes: 4 additions & 1 deletion server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,10 @@ func TestRegionHeartbeatHotStat(t *testing.T) {
re.NoError(err)
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster.coordinator = newCoordinator(ctx, cluster, nil)
newTestStores(4, "2.0.0")
stores := newTestStores(4, "2.0.0")
for _, store := range stores {
re.NoError(cluster.PutStore(store.GetMeta()))
}
peers := []*metapb.Peer{
{
Id: 1,
Expand Down
18 changes: 18 additions & 0 deletions server/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1391,6 +1391,9 @@ func TestHotCacheUpdateCache(t *testing.T) {
defer cancel()
opt := config.NewTestOptions()
tc := mockcluster.NewCluster(ctx, opt)
for i := 0; i < 3; i++ {
tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)}))
}
tc.SetHotRegionCacheHitsThreshold(0)

// For read flow
Expand Down Expand Up @@ -1460,6 +1463,9 @@ func TestHotCacheKeyThresholds(t *testing.T) {
}()
{ // only a few regions
tc := mockcluster.NewCluster(ctx, opt)
for i := 0; i < 6; i++ {
tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)}))
}
tc.SetHotRegionCacheHitsThreshold(0)
addRegionInfo(tc, statistics.Read, []testRegionInfo{
{1, []uint64{1, 2, 3}, 0, 1, 0},
Expand All @@ -1478,6 +1484,9 @@ func TestHotCacheKeyThresholds(t *testing.T) {
}
{ // many regions
tc := mockcluster.NewCluster(ctx, opt)
for i := 0; i < 3; i++ {
tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)}))
}
regions := []testRegionInfo{}
for i := 1; i <= 1000; i += 2 {
regions = append(regions,
Expand Down Expand Up @@ -1533,6 +1542,9 @@ func TestHotCacheByteAndKey(t *testing.T) {
defer cancel()
opt := config.NewTestOptions()
tc := mockcluster.NewCluster(ctx, opt)
for i := 0; i < 3; i++ {
tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)}))
}
tc.SetHotRegionCacheHitsThreshold(0)
statistics.ThresholdsUpdateInterval = 0
defer func() {
Expand Down Expand Up @@ -1656,6 +1668,9 @@ func TestHotCacheCheckRegionFlow(t *testing.T) {
tc.SetMaxReplicas(3)
tc.SetLocationLabels([]string{"zone", "host"})
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
for i := 0; i < 3; i++ {
tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)}))
}
sche, err := schedule.CreateScheduler(HotRegionType, schedule.NewOperatorController(ctx, tc, nil), storage.NewStorageWithMemoryBackend(), schedule.ConfigJSONDecoder([]byte("null")))
re.NoError(err)
hb := sche.(*hotScheduler)
Expand Down Expand Up @@ -1728,6 +1743,9 @@ func TestHotCacheCheckRegionFlowWithDifferentThreshold(t *testing.T) {
tc := mockcluster.NewCluster(ctx, opt)
tc.SetMaxReplicas(3)
tc.SetLocationLabels([]string{"zone", "host"})
for i := 0; i < 3; i++ {
tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)}))
}
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
statistics.ThresholdsUpdateInterval = 0
defer func() {
Expand Down
6 changes: 3 additions & 3 deletions server/statistics/hot_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ type HotCache struct {
}

// NewHotCache creates a new hot spot cache.
func NewHotCache(ctx context.Context) *HotCache {
func NewHotCache(ctx context.Context, cluster *core.BasicCluster) *HotCache {
w := &HotCache{
ctx: ctx,
writeCache: NewHotPeerCache(Write),
readCache: NewHotPeerCache(Read),
writeCache: NewHotPeerCache(cluster, Write),
readCache: NewHotPeerCache(cluster, Read),
}
go w.updateItems(w.readCache.taskQueue, w.runReadTask)
go w.updateItems(w.writeCache.taskQueue, w.runWriteTask)
Expand Down
38 changes: 36 additions & 2 deletions server/statistics/hot_peer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,21 @@ type thresholds struct {
// hotPeerCache saves the hot peer's statistics.
type hotPeerCache struct {
kind RWType
cluster *core.BasicCluster
peersOfStore map[uint64]*TopN // storeID -> hot peers
storesOfRegion map[uint64]map[uint64]struct{} // regionID -> storeIDs
regionsOfStore map[uint64]map[uint64]struct{} // storeID -> regionIDs
topNTTL time.Duration
taskQueue chan FlowItemTask
thresholdsOfStore map[uint64]*thresholds // storeID -> thresholds
// TODO: consider to remove store info when store is offline.
lastGCTime time.Time
}

// NewHotPeerCache creates a hotPeerCache
func NewHotPeerCache(kind RWType) *hotPeerCache {
func NewHotPeerCache(cluster *core.BasicCluster, kind RWType) *hotPeerCache {
return &hotPeerCache{
kind: kind,
cluster: cluster,
peersOfStore: make(map[uint64]*TopN),
storesOfRegion: make(map[uint64]map[uint64]struct{}),
regionsOfStore: make(map[uint64]map[uint64]struct{}),
Expand Down Expand Up @@ -112,6 +114,9 @@ func (f *hotPeerCache) RegionStats(minHotDegree int) map[uint64][]*HotPeerStat {
}

func (f *hotPeerCache) updateStat(item *HotPeerStat) {
defer func() {
f.gc()
}()
switch item.actionType {
case Remove:
f.removeItem(item)
Expand Down Expand Up @@ -525,6 +530,35 @@ func (f *hotPeerCache) removeItem(item *HotPeerStat) {
}
}

func (f *hotPeerCache) gc() {
if time.Since(f.lastGCTime) < f.topNTTL {
return
}
f.lastGCTime = time.Now()
// remove tombstone stores
stores := make(map[uint64]struct{})
for _, storeID := range f.cluster.GetStores() {
stores[storeID.GetID()] = struct{}{}
}
for storeID := range f.peersOfStore {
if _, ok := stores[storeID]; !ok {
delete(f.peersOfStore, storeID)
delete(f.regionsOfStore, storeID)
delete(f.thresholdsOfStore, storeID)
}
}
// remove expired items
for _, peers := range f.peersOfStore {
regions := peers.RemoveExpired()
for _, regionID := range regions {
delete(f.storesOfRegion, regionID)
for storeID := range f.regionsOfStore {
delete(f.regionsOfStore[storeID], regionID)
}
}
}
}

func (f *hotPeerCache) coldItem(newItem, oldItem *HotPeerStat) {
newItem.HotDegree = oldItem.HotDegree - 1
newItem.AntiCount = oldItem.AntiCount - 1
Expand Down
Loading

0 comments on commit 1779190

Please sign in to comment.