Skip to content

Commit

Permalink
sstable: shard categorized iterator stats
Browse files Browse the repository at this point in the history
Shard the categorized iterator stats to avoid mutex contention in high-read
workloads that are frequently closing sstable iterators.
  • Loading branch information
jbowens committed Oct 24, 2024
1 parent 8bf23da commit cd6ba1c
Showing 1 changed file with 45 additions and 14 deletions.
59 changes: 45 additions & 14 deletions sstable/category_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"slices"
"sync"
"time"
"unsafe"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
Expand Down Expand Up @@ -96,45 +97,75 @@ type CategoryStatsAggregate struct {
CategoryStats CategoryStats
}

const numCategoryStatsShards = 16

type categoryStatsWithMu struct {
mu sync.Mutex
// Protected by mu.
stats CategoryStatsAggregate
stats CategoryStats
}

// CategoryStatsCollector collects and aggregates the stats per category.
type CategoryStatsCollector struct {
// mu protects additions to statsMap.
mu sync.Mutex
// Category => categoryStatsWithMu.
// Category => *shardedCategoryStats.
statsMap sync.Map
}

// shardedCategoryStats accumulates stats for a category, splitting its stats
// across multiple shards to prevent mutex contention. In high-read workloads,
// contention on the category stats mutex has been observed.
type shardedCategoryStats struct {
Category Category
QoSLevel QoSLevel
shards [numCategoryStatsShards]struct {
categoryStatsWithMu
// Pad each shard to 64 bytes so they don't share a cache line.
_ [64 - unsafe.Sizeof(categoryStatsWithMu{})]byte
}
}

// getStats retrieves the aggregated stats for the category, summing across all
// shards.
func (s *shardedCategoryStats) getStats() CategoryStatsAggregate {
agg := CategoryStatsAggregate{
Category: s.Category,
QoSLevel: s.QoSLevel,
}
for i := range s.shards {
s.shards[i].mu.Lock()
agg.CategoryStats.aggregate(s.shards[i].stats)
s.shards[i].mu.Unlock()
}
return agg
}

func (c *CategoryStatsCollector) reportStats(
category Category, qosLevel QoSLevel, stats CategoryStats,
p uint64, category Category, qosLevel QoSLevel, stats CategoryStats,
) {
v, ok := c.statsMap.Load(category)
if !ok {
c.mu.Lock()
v, _ = c.statsMap.LoadOrStore(category, &categoryStatsWithMu{
stats: CategoryStatsAggregate{Category: category, QoSLevel: qosLevel},
v, _ = c.statsMap.LoadOrStore(category, &shardedCategoryStats{
Category: category,
QoSLevel: qosLevel,
})
c.mu.Unlock()
}
aggStats := v.(*categoryStatsWithMu)
aggStats.mu.Lock()
aggStats.stats.CategoryStats.aggregate(stats)
aggStats.mu.Unlock()

shardedStats := v.(*shardedCategoryStats)
s := ((p * 25214903917) >> 32) & (numCategoryStatsShards - 1)
shardedStats.shards[s].mu.Lock()
shardedStats.shards[s].stats.aggregate(stats)
shardedStats.shards[s].mu.Unlock()
}

// GetStats returns the aggregated stats.
func (c *CategoryStatsCollector) GetStats() []CategoryStatsAggregate {
var stats []CategoryStatsAggregate
c.statsMap.Range(func(_, v any) bool {
aggStats := v.(*categoryStatsWithMu)
aggStats.mu.Lock()
s := aggStats.stats
aggStats.mu.Unlock()
s := v.(*shardedCategoryStats).getStats()
if len(s.Category) == 0 {
s.Category = "_unknown"
}
Expand Down Expand Up @@ -175,6 +206,6 @@ func (accum *iterStatsAccumulator) reportStats(

func (accum *iterStatsAccumulator) close() {
if accum.collector != nil {
accum.collector.reportStats(accum.Category, accum.QoSLevel, accum.stats)
accum.collector.reportStats(uint64(uintptr(unsafe.Pointer(accum))), accum.Category, accum.QoSLevel, accum.stats)
}
}

0 comments on commit cd6ba1c

Please sign in to comment.