Skip to content

Commit

Permalink
This change adds a new compaction pool which enforces a global max
Browse files Browse the repository at this point in the history
compaction concurrency in a multi-store configuration. Each Pebble store
(i.e. an instance of *DB) still maintains its own per-store compaction
concurrency which is controlled by `opts.MaxConcurrentCompactions`.
However, in a multi-store configuration, disk I/O is a per-store resource
while CPU is shared across stores. A significant portion of compaction
is CPU-intensive, and so this ensures that excessive compactions don't
interrupt foreground CPU tasks even if the disks are capable of handling
the additional throughput from those compactions.

The shared compaction concurrency only applies to automatic compactions
This means that delete-only compactions are excluded because they are
expected to be cheap, as are flushes because they should never be
blocked.

Fixes: #3813
Informs: cockroachdb/cockroach#74697
  • Loading branch information
anish-shanbhag committed Aug 28, 2024
1 parent 94561af commit 6d38ddb
Show file tree
Hide file tree
Showing 8 changed files with 443 additions and 252 deletions.
425 changes: 308 additions & 117 deletions compaction.go

Large diffs are not rendered by default.

213 changes: 95 additions & 118 deletions compaction_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/humanize"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/internal/manifest"
)
Expand Down Expand Up @@ -192,6 +191,8 @@ type pickedCompaction struct {
score float64
// kind indicates the kind of compaction.
kind compactionKind
// isManual indicates whether this compaction was manually triggered.
isManual bool
// startLevel is the level that is being compacted. Inputs from startLevel
// and outputLevel will be merged to produce a set of outputLevel files.
startLevel *compactionLevel
Expand Down Expand Up @@ -1156,16 +1157,13 @@ func responsibleForGarbageBytes(virtualBackings *manifest.VirtualBackings, m *fi
return uint64(totalGarbage) / uint64(useCount)
}

// pickAuto picks the best compaction, if any.
//
// On each call, pickAuto computes per-level size adjustments based on
// in-progress compactions, and computes a per-level score. The levels are
// iterated over in decreasing score order trying to find a valid compaction
// anchored at that level.
//
// If a score-based compaction cannot be found, pickAuto falls back to looking
// for an elision-only compaction to remove obsolete keys.
// pickAuto picks the best compaction, if any. It first tries to find a
// score-based compaction; if one cannot be found, pickAuto falls back to
// various other compaction picking methods.
func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompaction) {
// We first check if this DB has reached its current max compaction
// concurrency.
//
// Compaction concurrency is controlled by L0 read-amp. We allow one
// additional compaction per L0CompactionConcurrency sublevels, as well as
// one additional compaction per CompactionDebtConcurrency bytes of
Expand All @@ -1184,107 +1182,9 @@ func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompact
}
}

scores := p.calculateLevelScores(env.inProgressCompactions)

// TODO(bananabrick): Either remove, or change this into an event sent to the
// EventListener.
logCompaction := func(pc *pickedCompaction) {
var buf bytes.Buffer
for i := 0; i < numLevels; i++ {
if i != 0 && i < p.baseLevel {
continue
}

var info *candidateLevelInfo
for j := range scores {
if scores[j].level == i {
info = &scores[j]
break
}
}

marker := " "
if pc.startLevel.level == info.level {
marker = "*"
}
fmt.Fprintf(&buf, " %sL%d: %5.1f %5.1f %5.1f %5.1f %8s %8s",
marker, info.level, info.compensatedScoreRatio, info.compensatedScore,
info.uncompensatedScoreRatio, info.uncompensatedScore,
humanize.Bytes.Int64(int64(totalCompensatedSize(
p.vers.Levels[info.level].Iter(),
))),
humanize.Bytes.Int64(p.levelMaxBytes[info.level]),
)

count := 0
for i := range env.inProgressCompactions {
c := &env.inProgressCompactions[i]
if c.inputs[0].level != info.level {
continue
}
count++
if count == 1 {
fmt.Fprintf(&buf, " [")
} else {
fmt.Fprintf(&buf, " ")
}
fmt.Fprintf(&buf, "L%d->L%d", c.inputs[0].level, c.outputLevel)
}
if count > 0 {
fmt.Fprintf(&buf, "]")
}
fmt.Fprintf(&buf, "\n")
}
p.opts.Logger.Infof("pickAuto: L%d->L%d\n%s",
pc.startLevel.level, pc.outputLevel.level, buf.String())
}

// Check for a score-based compaction. candidateLevelInfos are first sorted
// by whether they should be compacted, so if we find a level which shouldn't
// be compacted, we can break early.
for i := range scores {
info := &scores[i]
if !info.shouldCompact() {
break
}
if info.level == numLevels-1 {
continue
}

if info.level == 0 {
pc = pickL0(env, p.opts, p.vers, p.baseLevel)
// Fail-safe to protect against compacting the same sstable
// concurrently.
if pc != nil && !inputRangeAlreadyCompacting(env, pc) {
p.addScoresToPickedCompactionMetrics(pc, scores)
pc.score = info.compensatedScoreRatio
// TODO(bananabrick): Create an EventListener for logCompaction.
if false {
logCompaction(pc)
}
return pc
}
continue
}

// info.level > 0
var ok bool
info.file, ok = pickCompactionSeedFile(p.vers, p.virtualBackings, p.opts, info.level, info.outputLevel, env.earliestSnapshotSeqNum)
if !ok {
continue
}

pc := pickAutoLPositive(env, p.opts, p.vers, *info, p.baseLevel)
// Fail-safe to protect against compacting the same sstable concurrently.
if pc != nil && !inputRangeAlreadyCompacting(env, pc) {
p.addScoresToPickedCompactionMetrics(pc, scores)
pc.score = info.compensatedScoreRatio
// TODO(bananabrick): Create an EventListener for logCompaction.
if false {
logCompaction(pc)
}
return pc
}
// Score-based compactions have the highest priority.
if pc := p.pickScoreBasedCompaction(env); pc != nil {
return pc
}

// Check for files which contain excessive point tombstones that could slow
Expand Down Expand Up @@ -1337,8 +1237,61 @@ func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompact
// MarkedForCompaction field is persisted in the manifest. That's okay. We
// previously would've ignored the designation, whereas now we'll re-compact
// the file in place.
if p.vers.Stats.MarkedForCompaction > 0 {
if pc := p.pickRewriteCompaction(env); pc != nil {
if pc := p.pickRewriteCompaction(env); pc != nil {
return pc
}

return nil
}

// pickScoreBasedCompaction computes per-level size adjustments based on
// in-progress compactions, and computes a per-level score. The levels are
// iterated over in decreasing score order trying to find a valid compaction
// anchored at that level.
func (p *compactionPickerByScore) pickScoreBasedCompaction(
env compactionEnv,
) (pc *pickedCompaction) {
scores := p.calculateLevelScores(env.inProgressCompactions)

// Check for a score-based compaction. candidateLevelInfos are first sorted
// by whether they should be compacted, so if we find a level which shouldn't
// be compacted, we can break early.
for i := range scores {
info := &scores[i]
if !info.shouldCompact() {
break
}
if info.level == numLevels-1 {
continue
}

withScore := func(pc *pickedCompaction) *pickedCompaction {
// Fail-safe to protect against compacting the same sstable
// concurrently.
if pc == nil || inputRangeAlreadyCompacting(env, pc) {
return nil
}
p.addScoresToPickedCompactionMetrics(pc, scores)
pc.score = info.compensatedScoreRatio
pc.kind = compactionKindDefault
return pc
}

if info.level == 0 {
if pc = withScore(pickL0(env, p.opts, p.vers, p.baseLevel)); pc != nil {
return pc
}
continue
}

// info.level > 0
var ok bool
info.file, ok = pickCompactionSeedFile(p.vers, p.virtualBackings, p.opts, info.level, info.outputLevel, env.earliestSnapshotSeqNum)
if !ok {
continue
}

if pc = withScore(pickAutoLPositive(env, p.opts, p.vers, *info, p.baseLevel)); pc != nil {
return pc
}
}
Expand Down Expand Up @@ -1426,7 +1379,12 @@ var markedForCompactionAnnotator = &manifest.Annotator[fileMetadata]{
// with various checks to ensure that the file still exists in the expected level
// and isn't already being compacted.
func (p *compactionPickerByScore) pickedCompactionFromCandidateFile(
candidate *fileMetadata, env compactionEnv, startLevel int, outputLevel int, kind compactionKind,
candidate *fileMetadata,
env compactionEnv,
startLevel int,
outputLevel int,
kind compactionKind,
score float64,
) *pickedCompaction {
if candidate == nil || candidate.IsCompacting() {
return nil
Expand All @@ -1453,6 +1411,7 @@ func (p *compactionPickerByScore) pickedCompactionFromCandidateFile(

pc := newPickedCompaction(p.opts, p.vers, startLevel, outputLevel, p.baseLevel)
pc.kind = kind
pc.score = score
pc.startLevel.files = inputs
pc.smallest, pc.largest = manifest.KeyRange(pc.cmp, pc.startLevel.files.Iter())

Expand Down Expand Up @@ -1483,7 +1442,8 @@ func (p *compactionPickerByScore) pickElisionOnlyCompaction(
if candidate.LargestSeqNum >= env.earliestSnapshotSeqNum {
return nil
}
return p.pickedCompactionFromCandidateFile(candidate, env, numLevels-1, numLevels-1, compactionKindElisionOnly)
score := float64(max(candidate.Stats.RangeDeletionsBytesEstimate/max(candidate.Size, 1), candidate.Stats.NumDeletions/max(candidate.Stats.NumEntries, 1)))
return p.pickedCompactionFromCandidateFile(candidate, env, numLevels-1, numLevels-1, compactionKindElisionOnly, score)
}

// pickRewriteCompaction attempts to construct a compaction that
Expand All @@ -1492,13 +1452,17 @@ func (p *compactionPickerByScore) pickElisionOnlyCompaction(
// necessary. A rewrite compaction outputs files to the same level as
// the input level.
func (p *compactionPickerByScore) pickRewriteCompaction(env compactionEnv) (pc *pickedCompaction) {
if p.vers.Stats.MarkedForCompaction == 0 {
return nil
}

for l := numLevels - 1; l >= 0; l-- {
candidate := markedForCompactionAnnotator.LevelAnnotation(p.vers.Levels[l])
if candidate == nil {
// Try the next level.
continue
}
pc := p.pickedCompactionFromCandidateFile(candidate, env, l, l, compactionKindRewrite)
pc := p.pickedCompactionFromCandidateFile(candidate, env, l, l, compactionKindRewrite, 0)
if pc != nil {
return pc
}
Expand Down Expand Up @@ -1551,7 +1515,11 @@ func (p *compactionPickerByScore) pickTombstoneDensityCompaction(
}
}

return p.pickedCompactionFromCandidateFile(candidate, env, level, defaultOutputLevel(level, p.baseLevel), compactionKindTombstoneDensity)
if candidate == nil {
return nil
}

return p.pickedCompactionFromCandidateFile(candidate, env, level, defaultOutputLevel(level, p.baseLevel), compactionKindTombstoneDensity, candidate.Stats.TombstoneDenseBlocksRatio)
}

// pickAutoLPositive picks an automatic compaction for the candidate
Expand Down Expand Up @@ -1755,7 +1723,7 @@ func pickL0(env compactionEnv, opts *Options, vers *version, baseLevel int) (pc
return pc
}

func pickManualCompaction(
func newPickedManualCompaction(
vers *version, opts *Options, env compactionEnv, baseLevel int, manual *manualCompaction,
) (pc *pickedCompaction, retryLater bool) {
outputLevel := manual.level + 1
Expand All @@ -1779,6 +1747,11 @@ func pickManualCompaction(
return nil, true
}
pc = newPickedCompaction(opts, vers, manual.level, defaultOutputLevel(manual.level, baseLevel), baseLevel)
pc.kind = compactionKindDefault
pc.isManual = true
// NB: we set the score to math.MaxFloat64 so that manual compactions are
// always prioritized above automatic compactions.
pc.score = math.MaxFloat64
manual.outputLevel = pc.outputLevel.level
pc.startLevel.files = vers.Overlaps(manual.level, base.UserKeyBoundsInclusive(manual.start, manual.end))
if pc.startLevel.files.Empty() {
Expand Down Expand Up @@ -1909,6 +1882,10 @@ func pickReadTriggeredCompactionHelper(
return nil
}

// Prioritize read compactions with a smaller initial file size, since
// they will be cheaper to perform.
pc.score = -float64(outputOverlaps.SizeSum() + overlapSlice.SizeSum())

return pc
}

Expand Down
18 changes: 8 additions & 10 deletions compaction_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func TestCompactionPickerTargetLevel(t *testing.T) {
end: iEnd.UserKey,
}

pc, retryLater := pickManualCompaction(
pc, retryLater := newPickedManualCompaction(
pickerByScore.vers,
opts,
compactionEnv{
Expand Down Expand Up @@ -1413,17 +1413,15 @@ func TestCompactionPickerPickFile(t *testing.T) {
d.mu.Lock()
defer d.mu.Unlock()

// Use maybeScheduleCompactionPicker to take care of all of the
// initialization of the compaction-picking environment, but never
// pick a compaction; just call pickFile using the user-provided
// level.
var lf manifest.LevelFile
var ok bool
d.maybeScheduleCompactionPicker(func(untypedPicker compactionPicker, env compactionEnv) *pickedCompaction {
p := untypedPicker.(*compactionPickerByScore)
lf, ok = pickCompactionSeedFile(p.vers, p.virtualBackings, opts, level, level+1, env.earliestSnapshotSeqNum)
return nil
})
env := d.makeCompactionEnv()
if env == nil {
return "unable to lock the DB for compaction picking"
}
p := d.mu.versions.picker.(*compactionPickerByScore)
lf, ok = pickCompactionSeedFile(p.vers, p.virtualBackings, opts, level, level+1, env.earliestSnapshotSeqNum)
d.mu.versions.logUnlock()
if !ok {
return "(none)"
}
Expand Down
14 changes: 14 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,20 @@ type DB struct {
// compactionShedulers.Wait() should not be called while the DB.mu is held.
compactionSchedulers sync.WaitGroup

// compactionPool is responsible for scheduling both automatic and manual
// compactions.
//
// The compactionPool may enforce a global max compaction concurrency in a
// multi-store configuration. Each Pebble store (i.e. an instance of *DB)
// has its own per-store compaction concurrency which is controlled by
// opts.MaxConcurrentCompactions. However, in a multi-store configuration,
// disk I/O is a per-store resource while CPU is shared across stores.
// A significant portion of compaction is CPU-intensive, and so
// CompactionPool may be used to ensure that excessive compactions don't
// interrupt foreground CPU tasks even if the disks are capable of handling
// the additional throughput from those compactions.
compactionPool CompactionPool

// The main mutex protecting internal DB state. This mutex encompasses many
// fields because those fields need to be accessed and updated atomically. In
// particular, the current version, log.*, mem.*, and snapshot list need to
Expand Down
12 changes: 6 additions & 6 deletions format_major_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,13 +399,13 @@ func (d *DB) writeFormatVersionMarker(formatVers FormatMajorVersion) error {
// waiting for compactions to complete (or for slots to free up).
func (d *DB) compactMarkedFilesLocked() error {
curr := d.mu.versions.currentVersion()
if curr.Stats.MarkedForCompaction == 0 {
return nil
}
// Attempt to schedule a compaction to rewrite a file marked for
// compaction.
d.maybeScheduleCompaction()
for curr.Stats.MarkedForCompaction > 0 {
// Attempt to schedule a compaction to rewrite a file marked for
// compaction.
d.maybeScheduleCompactionPicker(func(picker compactionPicker, env compactionEnv) *pickedCompaction {
return picker.pickRewriteCompaction(env)
})

// The above attempt might succeed and schedule a rewrite compaction. Or
// there might not be available compaction concurrency to schedule the
// compaction. Or compaction of the file might have already been in
Expand Down
2 changes: 2 additions & 0 deletions open.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,8 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
}
d.updateReadStateLocked(d.opts.DebugCheck)

d.compactionPool = opts.CompactionPool

if !d.opts.ReadOnly {
// If the Options specify a format major version higher than the
// loaded database's, upgrade it. If this is a new database, this
Expand Down
Loading

0 comments on commit 6d38ddb

Please sign in to comment.