From 804f04bd661ecd988a8827278c34b0a755247683 Mon Sep 17 00:00:00 2001 From: Anish Shanbhag Date: Fri, 30 Aug 2024 14:37:33 -0400 Subject: [PATCH] compact: add shared compaction pool for multiple stores This change adds a new compaction pool which enforces a global max compaction concurrency in a multi-store configuration. Each Pebble store (i.e. an instance of *DB) still maintains its own per-store compaction concurrency which is controlled by `opts.MaxConcurrentCompactions`. However, in a multi-store configuration, disk I/O is a per-store resource while CPU is shared across stores. A significant portion of compaction is CPU-intensive, and so this ensures that excessive compactions don't interrupt foreground CPU tasks even if the disks are capable of handling the additional throughput from those compactions. The shared compaction concurrency only applies to automatic compactions This means that delete-only compactions are excluded because they are expected to be cheap, as are flushes because they should never be blocked. Fixes: #3813 Informs: https://github.com/cockroachdb/cockroach/issues/74697 --- compaction.go | 450 +++++++++++++++++++------- compaction_picker.go | 213 ++++++------ compaction_picker_test.go | 18 +- compaction_test.go | 104 ++++++ db.go | 14 + db_test.go | 7 + format_major_version.go | 12 +- open.go | 2 + options.go | 9 + snapshot.go | 2 +- testdata/prioritizing_compaction_pool | 110 +++++++ 11 files changed, 685 insertions(+), 256 deletions(-) create mode 100644 testdata/prioritizing_compaction_pool diff --git a/compaction.go b/compaction.go index fe6024bc42..2163c37818 100644 --- a/compaction.go +++ b/compaction.go @@ -11,6 +11,7 @@ import ( "math" "runtime/pprof" "slices" + "sync" "sync/atomic" "time" @@ -121,21 +122,28 @@ func (c *compactionWritable) Write(p []byte) error { type compactionKind int +// The ordering of these compaction kinds is important - it is used to +// determine the priority of a pickedCompaction when multiple DBs are waiting +// to schedule a compaction. A compactionKind is considered as being strictly +// higher priority than the one below it. +// +// The ordering of these compactionKinds should mirror the order in which +// compaction types are picked in compactionPicker. const ( - compactionKindDefault compactionKind = iota - compactionKindFlush + compactionKindFlush compactionKind = iota + // compactionKindDeleteOnly denotes a compaction that only deletes input + // files. It can occur when wide range tombstones completely contain sstables. + compactionKindDeleteOnly // compactionKindMove denotes a move compaction where the input file is // retained and linked in a new level without being obsoleted. compactionKindMove + compactionKindDefault // compactionKindCopy denotes a copy compaction where the input file is // copied byte-by-byte into a new file with a new FileNum in the output level. compactionKindCopy - // compactionKindDeleteOnly denotes a compaction that only deletes input - // files. It can occur when wide range tombstones completely contain sstables. - compactionKindDeleteOnly + compactionKindTombstoneDensity compactionKindElisionOnly compactionKindRead - compactionKindTombstoneDensity compactionKindRewrite compactionKindIngestedFlushable ) @@ -929,10 +937,9 @@ func (c *compaction) String() string { } var buf bytes.Buffer - for level := c.startLevel.level; level <= c.outputLevel.level; level++ { - i := level - c.startLevel.level - fmt.Fprintf(&buf, "%d:", level) - iter := c.inputs[i].files.Iter() + for _, input := range c.inputs { + fmt.Fprintf(&buf, "%d:", input.level) + iter := input.files.Iter() for f := iter.First(); f != nil; f = iter.Next() { fmt.Fprintf(&buf, " %s:%s-%s", f.FileNum, f.Smallest, f.Largest) } @@ -1644,34 +1651,19 @@ func (d *DB) maybeScheduleCompactionAsync() { d.mu.Unlock() } -// maybeScheduleCompaction schedules a compaction if necessary. -// -// d.mu must be held when calling this. -func (d *DB) maybeScheduleCompaction() { - d.maybeScheduleCompactionPicker(pickAuto) -} - -func pickAuto(picker compactionPicker, env compactionEnv) *pickedCompaction { - return picker.pickAuto(env) -} - -func pickElisionOnly(picker compactionPicker, env compactionEnv) *pickedCompaction { - return picker.pickElisionOnlyCompaction(env) -} - -// tryScheduleDownloadCompaction tries to start a download compaction. -// -// Returns true if we started a download compaction (or completed it -// immediately because it is a no-op or we hit an error). +// tryScheduleDownloadCompactions tries to start download compactions. // // Requires d.mu to be held. Updates d.mu.compact.downloads. -func (d *DB) tryScheduleDownloadCompaction(env compactionEnv, maxConcurrentDownloads int) bool { +func (d *DB) tryScheduleDownloadCompactions(env compactionEnv, maxConcurrentDownloads int) { vers := d.mu.versions.currentVersion() for i := 0; i < len(d.mu.compact.downloads); { + if d.mu.compact.downloadingCount >= maxConcurrentDownloads { + break + } download := d.mu.compact.downloads[i] switch d.tryLaunchDownloadCompaction(download, vers, env, maxConcurrentDownloads) { case launchedCompaction: - return true + continue case didNotLaunchCompaction: // See if we can launch a compaction for another download task. i++ @@ -1680,83 +1672,326 @@ func (d *DB) tryScheduleDownloadCompaction(env compactionEnv, maxConcurrentDownl d.mu.compact.downloads = slices.Delete(d.mu.compact.downloads, i, i+1) } } - return false } -// maybeScheduleCompactionPicker schedules a compaction if necessary, -// calling `pickFunc` to pick automatic compactions. +// makeCompactionEnv attempts to acquire d.mu.versions.logLock in order +// to provide the proper mutual exclusion necessary during compaction picking. +// If the DB is closed or marked as read-only, makeCompactionEnv returns nil to +// indicate that compactions may not be performed. Otherwise, a new +// compactionEnv is constructed using the current DB state. +// +// Compaction picking needs a coherent view of a Version. In particular, we +// need to exclude concurrent ingestions from making a decision on which level +// to ingest into that conflicts with our compaction +// decision. versionSet.logLock provides the necessary mutual exclusion. +// +// NOTE: makeCompactionEnv does not call d.mu.versions.logUnlock; it is the +// caller's responsibility to ensure that the manifest is unlocked. // // Requires d.mu to be held. -func (d *DB) maybeScheduleCompactionPicker( - pickFunc func(compactionPicker, compactionEnv) *pickedCompaction, -) { +func (d *DB) makeCompactionEnv() *compactionEnv { if d.closed.Load() != nil || d.opts.ReadOnly { - return - } - maxCompactions := d.opts.MaxConcurrentCompactions() - maxDownloads := d.opts.MaxConcurrentDownloads() - - if d.mu.compact.compactingCount >= maxCompactions && - (len(d.mu.compact.downloads) == 0 || d.mu.compact.downloadingCount >= maxDownloads) { - if len(d.mu.compact.manual) > 0 { - // Inability to run head blocks later manual compactions. - d.mu.compact.manual[0].retries++ - } - return + return nil } - // Compaction picking needs a coherent view of a Version. In particular, we - // need to exclude concurrent ingestions from making a decision on which level - // to ingest into that conflicts with our compaction - // decision. versionSet.logLock provides the necessary mutual exclusion. d.mu.versions.logLock() - defer d.mu.versions.logUnlock() // Check for the closed flag again, in case the DB was closed while we were // waiting for logLock(). if d.closed.Load() != nil { - return + d.mu.versions.logUnlock() + return nil } - env := compactionEnv{ + return &compactionEnv{ diskAvailBytes: d.diskAvailBytes.Load(), earliestSnapshotSeqNum: d.mu.snapshots.earliest(), earliestUnflushedSeqNum: d.getEarliestUnflushedSeqNumLocked(), + inProgressCompactions: d.getInProgressCompactionInfoLocked(nil), + readCompactionEnv: readCompactionEnv{ + readCompactions: &d.mu.compact.readCompactions, + flushing: d.mu.compact.flushing || d.passedFlushThreshold(), + rescheduleReadCompaction: &d.mu.compact.rescheduleReadCompaction, + }, + } +} + +// pickAnyCompaction tries to pick a manual or automatic compaction. +func (d *DB) pickAnyCompaction(env compactionEnv) (pc *pickedCompaction) { + pc = d.pickManualCompaction(env) + if pc == nil && !d.opts.DisableAutomaticCompactions { + pc = d.mu.versions.picker.pickAuto(env) + } + return pc +} + +// runPickedCompaction kicks off the provided pickedCompaction. In case the +// pickedCompaction is a manual compaction, the corresponding manualCompaction +// is removed from d.mu.compact.manual. +func (d *DB) runPickedCompaction(pc *pickedCompaction) { + var doneChannel chan error + if pc.isManual { + doneChannel = d.mu.compact.manual[0].done + d.mu.compact.manual = d.mu.compact.manual[1:] + } + + d.mu.compact.compactingCount++ + compaction := newCompaction(pc, d.opts, d.timeNow(), d.ObjProvider()) + d.addInProgressCompaction(compaction) + go func() { + d.compact(compaction, doneChannel) + d.compactionPool.CompactionFinished(pc) + }() +} + +// CompactionPool is responsible for scheduling both automatic and manual +// compactions. In the case of multiple DB instances (i.e. a multi-store +// configuration), implementations of CompactionPool may or may not enforce +// a global maximum compaction concurrency. +type CompactionPool interface { + // AddWaitingDB signals to the CompactionPool that the provided DB might + // have compaction(s) that need to be scheduled. Implementations of + // CompactionPool may decide when a compaction will actually be picked + // and run from this DB. + // + // DB.mu should NOT be held for any DB (including d) when AddWaitingDB is + // called. + AddWaitingDB(d *DB) + // CompactionFinished signals to the CompactionPool that the provided + // pickedCompaction has now finished running. + CompactionFinished(pc *pickedCompaction) +} + +var defaultCompactionPool = &UnlimitedCompactionPool{} + +// UnlimitedCompactionPool implements CompactionPool. It does not enforce any +// global maximum compaction concurrency when multiple DBs attempt to schedule +// compactions. +type UnlimitedCompactionPool struct{} + +func (ucp *UnlimitedCompactionPool) AddWaitingDB(d *DB) { + d.mu.Lock() + env := d.makeCompactionEnv() + if env == nil { + d.mu.Unlock() + return + } + + pc := d.pickAnyCompaction(*env) + if pc != nil { + d.runPickedCompaction(pc) + // We might be able to schedule more compactions. + defer ucp.AddWaitingDB(d) + } + + d.mu.versions.logUnlock() + d.mu.Unlock() +} + +func (ucp *UnlimitedCompactionPool) CompactionFinished(pc *pickedCompaction) { + // No-op for an UnlimitedCompactionPool. +} + +// PrioritizingCompactionPool enforces a global max compaction concurrency +// in a multi-store configuration. If multiple DBs are waiting to perform a +// compaction, it prioritizes the DB whose pickedCompaction has the highest +// priority. +type PrioritizingCompactionPool struct { + mu sync.Mutex + // cond is used during testing to signal that a compaction has finished. + cond sync.Cond + // compactingCount is the current number of running compactions across + // all DBs. + compactingCount int + // waiting contains all DBs which might have compactions that need to be + // scheduled. The value stored for each DB may be nil to indicate that + // a compaction needs to be picked from the DB. + waiting map[*DB]*pickedCompaction + // maxCompactionConcurrency defines the global max compaction concurrency + // across all DBs. + maxCompactionConcurrency int +} + +// NewPrioritizingCompactionPool creates a new PrioritizingCompactionPool +// with the specified maxCompactionConcurrency. +func NewPrioritizingCompactionPool(maxCompactionConcurrency int) *PrioritizingCompactionPool { + if maxCompactionConcurrency <= 0 { + panic("pebble: maxCompactionConcurrency for a CompactionPool must be greater than 0") + } + pcp := &PrioritizingCompactionPool{ + maxCompactionConcurrency: maxCompactionConcurrency, + waiting: make(map[*DB]*pickedCompaction), + } + pcp.cond.L = &pcp.mu + return pcp +} + +// shouldLimitConcurrency returns true if the provided pickedCompaction should +// be counted towards this PrioritizingCompactionPool's compactingCount. +func (pcp *PrioritizingCompactionPool) shouldLimitConcurrency(pc *pickedCompaction) bool { + return pc.kind != compactionKindDeleteOnly && pc.kind != compactionKindMove +} + +// comparePickedCompactions returns true if pc1 is higher priority than pc2. +func comparePickedCompactions(pc1 *pickedCompaction, pc2 *pickedCompaction) bool { + if pc1 == nil { + return false + } else if pc2 == nil { + return true + } + + // If pc1 and pc2 are of different compactionKinds, pc1 is higher priority + // only if its compactionKind is higher priority. The relative priorities + // of compactionKinds are determined by the order they are defined in. See + // the comment below compactionKind. + if pc1.kind < pc2.kind { + return true + } + // Otherwise, use the pickedCompaction's score to break ties. This is the + // score of the level in the case of score-based compactions; other + // compaction types assign the score differently. + return pc1.score > pc2.score +} + +// maybeScheduleWaitingCompactionLocked attempts to schedule a waiting +// compaction from the list of waiting DBs. It prioritizes the DB with the +// highest priority pickedCompaction as defined by comparePickedCompactions. +// +// c.mu must be held. DB.mu must not be held for any DB. +func (pcp *PrioritizingCompactionPool) maybeScheduleWaitingCompactionLocked() { + if pcp.compactingCount >= pcp.maxCompactionConcurrency { + return } - if d.mu.compact.compactingCount < maxCompactions { - // Check for delete-only compactions first, because they're expected to be - // cheap and reduce future compaction work. - if !d.opts.private.disableDeleteOnlyCompactions && - !d.opts.DisableAutomaticCompactions && - len(d.mu.compact.deletionHints) > 0 { - d.tryScheduleDeleteOnlyCompaction() + unlockDB := func(d *DB) { + d.mu.versions.logUnlock() + d.mu.Unlock() + } + + var selectedDB *DB + // We need to find the highest-priority compaction across all waiting DBs. + // This is tricky because concurrent flushes, ingestions, etc. from each DB + // may invalidate a compaction after it is picked. For this reason, the + // below loop maintains the invariant that both selectedDB.mu and + // selectedDB.mu.versions.logLock are continuously held until we either find + // a higher priority compaction from one of the other DBs, or we finish + // iteration and start the compaction. + // + // Although the ordering of pcp.waiting is arbitrary and thus the order in + // locks are acquired is also arbitrary, this is fine only because we hold + // c.mu throughout this method. + for d := range pcp.waiting { + d.mu.Lock() + + env := d.makeCompactionEnv() + if env == nil { + // This DB is read-only or closed; skip it. + delete(pcp.waiting, d) + d.mu.Unlock() + continue } - for len(d.mu.compact.manual) > 0 && d.mu.compact.compactingCount < maxCompactions { - if manual := d.mu.compact.manual[0]; !d.tryScheduleManualCompaction(env, manual) { - // Inability to run head blocks later manual compactions. - manual.retries++ - break + if pcp.waiting[d] == nil { + if pcp.waiting[d] = d.pickAnyCompaction(*env); pcp.waiting[d] == nil { + // There are no compactions that can be scheduled from this DB. + // Mark it as no longer waiting. + delete(pcp.waiting, d) + unlockDB(d) + continue } - d.mu.compact.manual = d.mu.compact.manual[1:] } - - for !d.opts.DisableAutomaticCompactions && d.mu.compact.compactingCount < maxCompactions && - d.tryScheduleAutoCompaction(env, pickFunc) { + if selectedDB == nil { + selectedDB = d + } else if comparePickedCompactions(pcp.waiting[d], pcp.waiting[selectedDB]) { + // We've found a higher priority pickedCompaction - first unlock the old + // selectedDB, and then swap it out for the current DB. + // + // NB: the pickedCompaction for the previous selectedDB can be cached + // because it will still be valid until d.maybeScheduleCompaction is + // called in the future, at which point the cached pickedCompaction + // will be invalidated. + // + // TODO: the above is currently not true because of a data race; we need + // to atomically invalidate the pickedCompaction before release d.mu + // inside maybeScheduleCompaction. + unlockDB(selectedDB) + selectedDB = d + } else { + // This DB's pickedCompaction is lower priority than that of selectedDB. + // Release locks and continue. + unlockDB(d) } } - for len(d.mu.compact.downloads) > 0 && d.mu.compact.downloadingCount < maxDownloads && - d.tryScheduleDownloadCompaction(env, maxDownloads) { + if selectedDB == nil { + return + } + + // At this point, locks are held only for selectedDB.mu. + pc := pcp.waiting[selectedDB] + if pcp.shouldLimitConcurrency(pc) { + pcp.compactingCount++ + } + selectedDB.runPickedCompaction(pc) + pcp.waiting[selectedDB] = nil + + unlockDB(selectedDB) + pcp.maybeScheduleWaitingCompactionLocked() +} + +func (pcp *PrioritizingCompactionPool) CompactionFinished(pc *pickedCompaction) { + if pcp.shouldLimitConcurrency(pc) { + pcp.mu.Lock() + defer pcp.mu.Unlock() + + pcp.compactingCount-- + pcp.maybeScheduleWaitingCompactionLocked() + pcp.cond.Broadcast() } } +func (pcp *PrioritizingCompactionPool) AddWaitingDB(d *DB) { + pcp.mu.Lock() + defer pcp.mu.Unlock() + // Mark this DB as waiting, but also invalidate any existing + // pickedCompaction from this DB since a higher priority compaction + // could be picked. + pcp.waiting[d] = nil + pcp.maybeScheduleWaitingCompactionLocked() +} + +// maybeScheduleCompaction schedules a compaction if necessary. +// +// Requires d.mu to be held. +func (d *DB) maybeScheduleCompaction() { + env := d.makeCompactionEnv() + if env == nil { + return + } + // Delete-only compactions are expected to be cheap and reduce future + // compaction work, so schedule them directly instead of using + // d.compactionPool. + d.tryScheduleDeleteOnlyCompaction() + // Download compactions have their own concurrency. + d.tryScheduleDownloadCompactions(*env, d.opts.MaxConcurrentDownloads()) + d.mu.versions.logUnlock() + + // NB: we must release d.mu to avoid deadlock when calling + // addWaitingDB below. + d.mu.Unlock() + d.compactionPool.AddWaitingDB(d) + d.mu.Lock() +} + // tryScheduleDeleteOnlyCompaction tries to kick off a delete-only compaction // for all files that can be deleted as suggested by deletionHints. // // Requires d.mu to be held. Updates d.mu.compact.deletionHints. func (d *DB) tryScheduleDeleteOnlyCompaction() { + if d.opts.private.disableDeleteOnlyCompactions || d.opts.DisableAutomaticCompactions || d.mu.compact.compactingCount >= d.opts.MaxConcurrentCompactions() || len(d.mu.compact.deletionHints) == 0 { + return + } + v := d.mu.versions.currentVersion() snapshots := d.mu.snapshots.toSlice() inputs, unresolvedHints := checkDeleteCompactionHints(d.cmp, v, d.mu.compact.deletionHints, snapshots) @@ -1770,56 +2005,29 @@ func (d *DB) tryScheduleDeleteOnlyCompaction() { } } -// tryScheduleManualCompaction tries to kick off the given manual compaction. -// -// Returns false if we are not able to run this compaction at this time. -// -// Requires d.mu to be held. -func (d *DB) tryScheduleManualCompaction(env compactionEnv, manual *manualCompaction) bool { +func (d *DB) pickManualCompaction(env compactionEnv) (pc *pickedCompaction) { v := d.mu.versions.currentVersion() - env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil) - pc, retryLater := pickManualCompaction(v, d.opts, env, d.mu.versions.picker.getBaseLevel(), manual) - if pc == nil { - if !retryLater { - // Manual compaction is a no-op. Signal completion and exit. - manual.done <- nil - return true - } - // We are not able to run this manual compaction at this time. - return false - } - - c := newCompaction(pc, d.opts, d.timeNow(), d.ObjProvider()) - d.mu.compact.compactingCount++ - d.addInProgressCompaction(c) - go d.compact(c, manual.done) - return true -} + for len(d.mu.compact.manual) > 0 { + manual := d.mu.compact.manual[0] + if d.mu.compact.compactingCount >= d.opts.MaxConcurrentCompactions() { + manual.retries++ + return nil + } -// tryScheduleAutoCompaction tries to kick off an automatic compaction. -// -// Returns false if no automatic compactions are necessary or able to run at -// this time. -// -// Requires d.mu to be held. -func (d *DB) tryScheduleAutoCompaction( - env compactionEnv, pickFunc func(compactionPicker, compactionEnv) *pickedCompaction, -) bool { - env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil) - env.readCompactionEnv = readCompactionEnv{ - readCompactions: &d.mu.compact.readCompactions, - flushing: d.mu.compact.flushing || d.passedFlushThreshold(), - rescheduleReadCompaction: &d.mu.compact.rescheduleReadCompaction, - } - pc := pickFunc(d.mu.versions.picker, env) - if pc == nil { - return false + pc, retryLater := newPickedManualCompaction(v, d.opts, env, d.mu.versions.picker.getBaseLevel(), manual) + if pc != nil { + return pc + } + if retryLater { + // We are not able to run this manual compaction at this time. + manual.retries++ + return nil + } + // Manual compaction is a no-op. Signal that it's complete. + manual.done <- nil + d.mu.compact.manual = d.mu.compact.manual[1:] } - c := newCompaction(pc, d.opts, d.timeNow(), d.ObjProvider()) - d.mu.compact.compactingCount++ - d.addInProgressCompaction(c) - go d.compact(c, nil) - return true + return nil } // deleteCompactionHintType indicates whether the deleteCompactionHint was diff --git a/compaction_picker.go b/compaction_picker.go index 79578d0749..baa239a79d 100644 --- a/compaction_picker.go +++ b/compaction_picker.go @@ -13,7 +13,6 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" - "github.com/cockroachdb/pebble/internal/humanize" "github.com/cockroachdb/pebble/internal/invariants" "github.com/cockroachdb/pebble/internal/manifest" ) @@ -192,6 +191,8 @@ type pickedCompaction struct { score float64 // kind indicates the kind of compaction. kind compactionKind + // isManual indicates whether this compaction was manually triggered. + isManual bool // startLevel is the level that is being compacted. Inputs from startLevel // and outputLevel will be merged to produce a set of outputLevel files. startLevel *compactionLevel @@ -1156,16 +1157,13 @@ func responsibleForGarbageBytes(virtualBackings *manifest.VirtualBackings, m *fi return uint64(totalGarbage) / uint64(useCount) } -// pickAuto picks the best compaction, if any. -// -// On each call, pickAuto computes per-level size adjustments based on -// in-progress compactions, and computes a per-level score. The levels are -// iterated over in decreasing score order trying to find a valid compaction -// anchored at that level. -// -// If a score-based compaction cannot be found, pickAuto falls back to looking -// for an elision-only compaction to remove obsolete keys. +// pickAuto picks the best compaction, if any. It first tries to find a +// score-based compaction; if one cannot be found, pickAuto falls back to +// various other compaction picking methods. func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompaction) { + // We first check if this DB has reached its current max compaction + // concurrency. + // // Compaction concurrency is controlled by L0 read-amp. We allow one // additional compaction per L0CompactionConcurrency sublevels, as well as // one additional compaction per CompactionDebtConcurrency bytes of @@ -1184,107 +1182,9 @@ func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompact } } - scores := p.calculateLevelScores(env.inProgressCompactions) - - // TODO(bananabrick): Either remove, or change this into an event sent to the - // EventListener. - logCompaction := func(pc *pickedCompaction) { - var buf bytes.Buffer - for i := 0; i < numLevels; i++ { - if i != 0 && i < p.baseLevel { - continue - } - - var info *candidateLevelInfo - for j := range scores { - if scores[j].level == i { - info = &scores[j] - break - } - } - - marker := " " - if pc.startLevel.level == info.level { - marker = "*" - } - fmt.Fprintf(&buf, " %sL%d: %5.1f %5.1f %5.1f %5.1f %8s %8s", - marker, info.level, info.compensatedScoreRatio, info.compensatedScore, - info.uncompensatedScoreRatio, info.uncompensatedScore, - humanize.Bytes.Int64(int64(totalCompensatedSize( - p.vers.Levels[info.level].Iter(), - ))), - humanize.Bytes.Int64(p.levelMaxBytes[info.level]), - ) - - count := 0 - for i := range env.inProgressCompactions { - c := &env.inProgressCompactions[i] - if c.inputs[0].level != info.level { - continue - } - count++ - if count == 1 { - fmt.Fprintf(&buf, " [") - } else { - fmt.Fprintf(&buf, " ") - } - fmt.Fprintf(&buf, "L%d->L%d", c.inputs[0].level, c.outputLevel) - } - if count > 0 { - fmt.Fprintf(&buf, "]") - } - fmt.Fprintf(&buf, "\n") - } - p.opts.Logger.Infof("pickAuto: L%d->L%d\n%s", - pc.startLevel.level, pc.outputLevel.level, buf.String()) - } - - // Check for a score-based compaction. candidateLevelInfos are first sorted - // by whether they should be compacted, so if we find a level which shouldn't - // be compacted, we can break early. - for i := range scores { - info := &scores[i] - if !info.shouldCompact() { - break - } - if info.level == numLevels-1 { - continue - } - - if info.level == 0 { - pc = pickL0(env, p.opts, p.vers, p.baseLevel) - // Fail-safe to protect against compacting the same sstable - // concurrently. - if pc != nil && !inputRangeAlreadyCompacting(env, pc) { - p.addScoresToPickedCompactionMetrics(pc, scores) - pc.score = info.compensatedScoreRatio - // TODO(bananabrick): Create an EventListener for logCompaction. - if false { - logCompaction(pc) - } - return pc - } - continue - } - - // info.level > 0 - var ok bool - info.file, ok = pickCompactionSeedFile(p.vers, p.virtualBackings, p.opts, info.level, info.outputLevel, env.earliestSnapshotSeqNum) - if !ok { - continue - } - - pc := pickAutoLPositive(env, p.opts, p.vers, *info, p.baseLevel) - // Fail-safe to protect against compacting the same sstable concurrently. - if pc != nil && !inputRangeAlreadyCompacting(env, pc) { - p.addScoresToPickedCompactionMetrics(pc, scores) - pc.score = info.compensatedScoreRatio - // TODO(bananabrick): Create an EventListener for logCompaction. - if false { - logCompaction(pc) - } - return pc - } + // Score-based compactions have the highest priority. + if pc := p.pickScoreBasedCompaction(env); pc != nil { + return pc } // Check for files which contain excessive point tombstones that could slow @@ -1337,8 +1237,61 @@ func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompact // MarkedForCompaction field is persisted in the manifest. That's okay. We // previously would've ignored the designation, whereas now we'll re-compact // the file in place. - if p.vers.Stats.MarkedForCompaction > 0 { - if pc := p.pickRewriteCompaction(env); pc != nil { + if pc := p.pickRewriteCompaction(env); pc != nil { + return pc + } + + return nil +} + +// pickScoreBasedCompaction computes per-level size adjustments based on +// in-progress compactions, and computes a per-level score. The levels are +// iterated over in decreasing score order trying to find a valid compaction +// anchored at that level. +func (p *compactionPickerByScore) pickScoreBasedCompaction( + env compactionEnv, +) (pc *pickedCompaction) { + scores := p.calculateLevelScores(env.inProgressCompactions) + + // Check for a score-based compaction. candidateLevelInfos are first sorted + // by whether they should be compacted, so if we find a level which shouldn't + // be compacted, we can break early. + for i := range scores { + info := &scores[i] + if !info.shouldCompact() { + break + } + if info.level == numLevels-1 { + continue + } + + withScore := func(pc *pickedCompaction) *pickedCompaction { + // Fail-safe to protect against compacting the same sstable + // concurrently. + if pc == nil || inputRangeAlreadyCompacting(env, pc) { + return nil + } + p.addScoresToPickedCompactionMetrics(pc, scores) + pc.score = info.compensatedScoreRatio + pc.kind = compactionKindDefault + return pc + } + + if info.level == 0 { + if pc = withScore(pickL0(env, p.opts, p.vers, p.baseLevel)); pc != nil { + return pc + } + continue + } + + // info.level > 0 + var ok bool + info.file, ok = pickCompactionSeedFile(p.vers, p.virtualBackings, p.opts, info.level, info.outputLevel, env.earliestSnapshotSeqNum) + if !ok { + continue + } + + if pc = withScore(pickAutoLPositive(env, p.opts, p.vers, *info, p.baseLevel)); pc != nil { return pc } } @@ -1426,7 +1379,12 @@ var markedForCompactionAnnotator = &manifest.Annotator[fileMetadata]{ // with various checks to ensure that the file still exists in the expected level // and isn't already being compacted. func (p *compactionPickerByScore) pickedCompactionFromCandidateFile( - candidate *fileMetadata, env compactionEnv, startLevel int, outputLevel int, kind compactionKind, + candidate *fileMetadata, + env compactionEnv, + startLevel int, + outputLevel int, + kind compactionKind, + score float64, ) *pickedCompaction { if candidate == nil || candidate.IsCompacting() { return nil @@ -1453,6 +1411,7 @@ func (p *compactionPickerByScore) pickedCompactionFromCandidateFile( pc := newPickedCompaction(p.opts, p.vers, startLevel, outputLevel, p.baseLevel) pc.kind = kind + pc.score = score pc.startLevel.files = inputs pc.smallest, pc.largest = manifest.KeyRange(pc.cmp, pc.startLevel.files.Iter()) @@ -1483,7 +1442,8 @@ func (p *compactionPickerByScore) pickElisionOnlyCompaction( if candidate.LargestSeqNum >= env.earliestSnapshotSeqNum { return nil } - return p.pickedCompactionFromCandidateFile(candidate, env, numLevels-1, numLevels-1, compactionKindElisionOnly) + score := float64(max(candidate.Stats.RangeDeletionsBytesEstimate/max(candidate.Size, 1), candidate.Stats.NumDeletions/max(candidate.Stats.NumEntries, 1))) + return p.pickedCompactionFromCandidateFile(candidate, env, numLevels-1, numLevels-1, compactionKindElisionOnly, score) } // pickRewriteCompaction attempts to construct a compaction that @@ -1492,13 +1452,17 @@ func (p *compactionPickerByScore) pickElisionOnlyCompaction( // necessary. A rewrite compaction outputs files to the same level as // the input level. func (p *compactionPickerByScore) pickRewriteCompaction(env compactionEnv) (pc *pickedCompaction) { + if p.vers.Stats.MarkedForCompaction == 0 { + return nil + } + for l := numLevels - 1; l >= 0; l-- { candidate := markedForCompactionAnnotator.LevelAnnotation(p.vers.Levels[l]) if candidate == nil { // Try the next level. continue } - pc := p.pickedCompactionFromCandidateFile(candidate, env, l, l, compactionKindRewrite) + pc := p.pickedCompactionFromCandidateFile(candidate, env, l, l, compactionKindRewrite, 0) if pc != nil { return pc } @@ -1551,7 +1515,11 @@ func (p *compactionPickerByScore) pickTombstoneDensityCompaction( } } - return p.pickedCompactionFromCandidateFile(candidate, env, level, defaultOutputLevel(level, p.baseLevel), compactionKindTombstoneDensity) + if candidate == nil { + return nil + } + + return p.pickedCompactionFromCandidateFile(candidate, env, level, defaultOutputLevel(level, p.baseLevel), compactionKindTombstoneDensity, candidate.Stats.TombstoneDenseBlocksRatio) } // pickAutoLPositive picks an automatic compaction for the candidate @@ -1755,7 +1723,7 @@ func pickL0(env compactionEnv, opts *Options, vers *version, baseLevel int) (pc return pc } -func pickManualCompaction( +func newPickedManualCompaction( vers *version, opts *Options, env compactionEnv, baseLevel int, manual *manualCompaction, ) (pc *pickedCompaction, retryLater bool) { outputLevel := manual.level + 1 @@ -1779,6 +1747,11 @@ func pickManualCompaction( return nil, true } pc = newPickedCompaction(opts, vers, manual.level, defaultOutputLevel(manual.level, baseLevel), baseLevel) + pc.kind = compactionKindDefault + pc.isManual = true + // NB: we set the score to math.MaxFloat64 so that manual compactions are + // always prioritized above automatic compactions. + pc.score = math.MaxFloat64 manual.outputLevel = pc.outputLevel.level pc.startLevel.files = vers.Overlaps(manual.level, base.UserKeyBoundsInclusive(manual.start, manual.end)) if pc.startLevel.files.Empty() { @@ -1909,6 +1882,10 @@ func pickReadTriggeredCompactionHelper( return nil } + // Prioritize read compactions with a smaller initial file size, since + // they will be cheaper to perform. + pc.score = -float64(outputOverlaps.SizeSum() + overlapSlice.SizeSum()) + return pc } diff --git a/compaction_picker_test.go b/compaction_picker_test.go index 481f4a6cd1..baa7e3d3b5 100644 --- a/compaction_picker_test.go +++ b/compaction_picker_test.go @@ -325,7 +325,7 @@ func TestCompactionPickerTargetLevel(t *testing.T) { end: iEnd.UserKey, } - pc, retryLater := pickManualCompaction( + pc, retryLater := newPickedManualCompaction( pickerByScore.vers, opts, compactionEnv{ @@ -1413,17 +1413,15 @@ func TestCompactionPickerPickFile(t *testing.T) { d.mu.Lock() defer d.mu.Unlock() - // Use maybeScheduleCompactionPicker to take care of all of the - // initialization of the compaction-picking environment, but never - // pick a compaction; just call pickFile using the user-provided - // level. var lf manifest.LevelFile var ok bool - d.maybeScheduleCompactionPicker(func(untypedPicker compactionPicker, env compactionEnv) *pickedCompaction { - p := untypedPicker.(*compactionPickerByScore) - lf, ok = pickCompactionSeedFile(p.vers, p.virtualBackings, opts, level, level+1, env.earliestSnapshotSeqNum) - return nil - }) + env := d.makeCompactionEnv() + if env == nil { + return "unable to lock the DB for compaction picking" + } + p := d.mu.versions.picker.(*compactionPickerByScore) + lf, ok = pickCompactionSeedFile(p.vers, p.virtualBackings, opts, level, level+1, env.earliestSnapshotSeqNum) + d.mu.versions.logUnlock() if !ok { return "(none)" } diff --git a/compaction_test.go b/compaction_test.go index edd6283c21..b096643731 100644 --- a/compaction_test.go +++ b/compaction_test.go @@ -2836,3 +2836,107 @@ func TestCompactionErrorStats(t *testing.T) { d.mu.Unlock() require.NoError(t, d.Close()) } + +// TestPrioritizingCompactionPool tests the behavior of a compaction pool +// with a maximum global concurrency of 1, ensuring that compactions are +// scheduled in a correctly prioritized order. +func TestPrioritizingCompactionPool(t *testing.T) { + var dbs []*DB + var buf bytes.Buffer + var compactInfo *CompactionInfo + var lastDB int + + compactionPool := NewPrioritizingCompactionPool(1) + datadriven.RunTest(t, "testdata/prioritizing_compaction_pool", + func(t *testing.T, td *datadriven.TestData) string { + switch td.Cmd { + case "define": + dbIndex := len(dbs) + opts := (&Options{ + FS: vfs.NewMem(), + DebugCheck: DebugCheckLevels, + FormatMajorVersion: internalFormatNewest, + DisableAutomaticCompactions: true, + CompactionPool: compactionPool, + EventListener: &EventListener{ + CompactionEnd: func(info CompactionInfo) { + // Fix the duration and output for determinism. + info.TotalDuration = time.Millisecond + info.Output.Tables = nil + compactInfo = &info + lastDB = dbIndex + }, + }, + }).WithFSDefaults() + d, err := runDBDefineCmd(td, opts) + if err != nil { + return err.Error() + } + d.mu.Lock() + s := d.mu.versions.currentVersion().String() + d.mu.Unlock() + dbs = append(dbs, d) + return s + + case "allow-compactions": + for _, d := range dbs { + d.opts.DisableAutomaticCompactions = false + d.mu.Lock() + d.maybeScheduleCompaction() + d.mu.Unlock() + } + return "" + + case "compact": + // TODO: to make this test deterministic, we need to ensure that + // these manual compactions are scheduled in sequential order. + // However, d.Compact blocks until the compaction is complete, + // whereas we should only wait here until the compaction has + // been queued in d.mu.compact.manual. + for _, arg := range td.CmdArgs { + parts := strings.Split(arg.Key, "-") + go dbs[0].Compact([]byte(parts[0]), []byte(parts[1]), false) + } + return "" + + case "ingest": + numTables := 12 + for i := range numTables { + key := i % 4 + path := fmt.Sprintf("ext%d", key) + f, err := dbs[1].opts.FS.Create(path, vfs.WriteCategoryUnspecified) + require.NoError(t, err) + w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{ + TableFormat: dbs[1].FormatMajorVersion().MaxTableFormat(), + }) + require.NoError(t, w.Set([]byte(fmt.Sprint(key)), nil)) + require.NoError(t, w.Close()) + require.NoError(t, dbs[1].Ingest(context.Background(), []string{path})) + } + return "" + + case "wait-for-compactions": + buf.Reset() + compactionPool.mu.Lock() + defer compactionPool.mu.Unlock() + + for compactionPool.compactingCount > 0 { + compactionPool.cond.Wait() + fmt.Fprintf(&buf, "dbs[%d] finished a compaction: %v\n", lastDB, compactInfo) + fmt.Fprint(&buf, "in progress: ") + var numInProgress []int + for _, d := range dbs { + d.mu.Lock() + numInProgress = append(numInProgress, d.mu.compact.compactingCount) + d.mu.Unlock() + } + fmt.Fprintf(&buf, "%v\n", numInProgress) + } + fmt.Println(buf.String()) + return buf.String() + + default: + return fmt.Sprintf("unknown command: %s", td.Cmd) + } + }) +} diff --git a/db.go b/db.go index 0fca0798ea..71eb0660de 100644 --- a/db.go +++ b/db.go @@ -323,6 +323,20 @@ type DB struct { // compactionShedulers.Wait() should not be called while the DB.mu is held. compactionSchedulers sync.WaitGroup + // compactionPool is responsible for scheduling both automatic and manual + // compactions. + // + // The compactionPool may enforce a global max compaction concurrency in a + // multi-store configuration. Each Pebble store (i.e. an instance of *DB) + // has its own per-store compaction concurrency which is controlled by + // opts.MaxConcurrentCompactions. However, in a multi-store configuration, + // disk I/O is a per-store resource while CPU is shared across stores. + // A significant portion of compaction is CPU-intensive, and so + // CompactionPool may be used to ensure that excessive compactions don't + // interrupt foreground CPU tasks even if the disks are capable of handling + // the additional throughput from those compactions. + compactionPool CompactionPool + // The main mutex protecting internal DB state. This mutex encompasses many // fields because those fields need to be accessed and updated atomically. In // particular, the current version, log.*, mem.*, and snapshot list need to diff --git a/db_test.go b/db_test.go index 867f6f36b3..bd79bce9b5 100644 --- a/db_test.go +++ b/db_test.go @@ -1185,6 +1185,13 @@ func TestDBConcurrentCompactClose(t *testing.T) { require.NoError(t, d.Ingest(context.Background(), []string{path})) } + d.mu.Lock() + fmt.Println(d.mu.compact.compactingCount) + for c := range d.mu.compact.inProgress { + fmt.Println(c) + } + d.mu.Unlock() + require.NoError(t, d.Close()) } } diff --git a/format_major_version.go b/format_major_version.go index 90ef9f0b6a..252d18c4c9 100644 --- a/format_major_version.go +++ b/format_major_version.go @@ -399,13 +399,13 @@ func (d *DB) writeFormatVersionMarker(formatVers FormatMajorVersion) error { // waiting for compactions to complete (or for slots to free up). func (d *DB) compactMarkedFilesLocked() error { curr := d.mu.versions.currentVersion() + if curr.Stats.MarkedForCompaction == 0 { + return nil + } + // Attempt to schedule a compaction to rewrite a file marked for + // compaction. + d.maybeScheduleCompaction() for curr.Stats.MarkedForCompaction > 0 { - // Attempt to schedule a compaction to rewrite a file marked for - // compaction. - d.maybeScheduleCompactionPicker(func(picker compactionPicker, env compactionEnv) *pickedCompaction { - return picker.pickRewriteCompaction(env) - }) - // The above attempt might succeed and schedule a rewrite compaction. Or // there might not be available compaction concurrency to schedule the // compaction. Or compaction of the file might have already been in diff --git a/open.go b/open.go index 72de7a0c6f..1985416b28 100644 --- a/open.go +++ b/open.go @@ -548,6 +548,8 @@ func Open(dirname string, opts *Options) (db *DB, err error) { } d.updateReadStateLocked(d.opts.DebugCheck) + d.compactionPool = opts.CompactionPool + if !d.opts.ReadOnly { // If the Options specify a format major version higher than the // loaded database's, upgrade it. If this is a new database, this diff --git a/options.go b/options.go index 21a7b92563..2c5493a7d2 100644 --- a/options.go +++ b/options.go @@ -975,6 +975,12 @@ type Options struct { // The default value is 1. MaxConcurrentCompactions func() int + // CompactionPool is responsible for scheduling both automatic and manual + // compactions. In the case of multiple DB instances (i.e. a multi-store + // configuration), a CompactionPool may be used to enforce a global maximum + // compaction concurrency. + CompactionPool CompactionPool + // MaxConcurrentDownloads specifies the maximum number of download // compactions. These are compactions that copy an external file to the local // store. @@ -1268,6 +1274,9 @@ func (o *Options) EnsureDefaults() *Options { if o.MaxConcurrentCompactions == nil { o.MaxConcurrentCompactions = func() int { return 1 } } + if o.CompactionPool == nil { + o.CompactionPool = defaultCompactionPool + } if o.MaxConcurrentDownloads == nil { o.MaxConcurrentDownloads = func() int { return 1 } } diff --git a/snapshot.go b/snapshot.go index 1be35931c4..6ebf88c090 100644 --- a/snapshot.go +++ b/snapshot.go @@ -117,7 +117,7 @@ func (s *Snapshot) closeLocked() error { // If s was the previous earliest snapshot, we might be able to reclaim // disk space by dropping obsolete records that were pinned by s. if e := s.db.mu.snapshots.earliest(); e > s.seqNum { - s.db.maybeScheduleCompactionPicker(pickElisionOnly) + s.db.maybeScheduleCompaction() } s.db = nil return nil diff --git a/testdata/prioritizing_compaction_pool b/testdata/prioritizing_compaction_pool new file mode 100644 index 0000000000..4020b4bfee --- /dev/null +++ b/testdata/prioritizing_compaction_pool @@ -0,0 +1,110 @@ +# Define 3 DBs and ensure compactions occur in the correctly prioritized order. +# - dbs[0] has overlapping sstables in L5 and L6 and will be manually compacted. +# - dbs[1] is empty initially but will have overlapping files ingested into it, +# triggering automatic compactions. +# - dbs[2] has multiple sstables in L6 which only contain DELs, which triggers +# an elision-only compaction. + +# dbs[0] +define +L5 + a.SET.2: + b.SET.2: +L5 + c.SET.2: + d.SET.2: +L5 + e.SET.2: + f.SET.2: +L6 + a.SET.1: + b.SET.1: +L6 + c.SET.1: + d.SET.1: +L6 + e.SET.1: + f.SET.1: +---- +L5: + 000004:[a#2,SET-b#2,SET] + 000005:[c#2,SET-d#2,SET] + 000006:[e#2,SET-f#2,SET] +L6: + 000007:[a#1,SET-b#1,SET] + 000008:[c#1,SET-d#1,SET] + 000009:[e#1,SET-f#1,SET] + +# dbs[1] +define +---- + +ingest +---- + +# dbs[2] +define +L6 + a.DEL.1: +L6 + b.DEL.2: +L6 + c.DEL.3: +L6 + d.DEL.4: +L6 + e.DEL.5: +L6 + f.DEL.6: +---- +L6: + 000004:[a#1,DEL-a#1,DEL] + 000005:[b#2,DEL-b#2,DEL] + 000006:[c#3,DEL-c#3,DEL] + 000007:[d#4,DEL-d#4,DEL] + 000008:[e#5,DEL-e#5,DEL] + 000009:[f#6,DEL-f#6,DEL] + +# Enable automatic compactions on all DBs. +allow-compactions +---- + +# Asynchronously start manual compactions for dbs[0]. +compact a-b c-d e-f +---- + +# The compaction pool has a maximum concurrency of 1. +# +# Since automatic compactions were enabled first, a default compaction from +# dbs[1] should be first to complete. Second, all of the manual compactions from +# dbs[0] should be prioritized. Third, the remaining default compactions +# resulting from files ingested into dbs[1]. Finally, the elision-only compactions +# from dbs[2] should +wait-for-compactions +---- +dbs[1] finished a compaction: [JOB 15] compacted(default) L0 [000008 000012] (1.2KB) Score=100.00 + L6 [000004] (590B) Score=0.00 -> L6 [] (0B), in 0.0s (0.0s total), output rate 0B/s +in progress: [1 0 0] +dbs[0] finished a compaction: [JOB 4] compacted(default) L5 [000004] (594B) Score=0.00 + L6 [000007] (594B) Score=0.00 -> L6 [] (0B), in 0.0s (0.0s total), output rate 0B/s +in progress: [1 0 0] +dbs[0] finished a compaction: [JOB 5] compacted(default) L5 [000005] (594B) Score=0.00 + L6 [000008] (594B) Score=0.00 -> L6 [] (0B), in 0.0s (0.0s total), output rate 0B/s +in progress: [1 0 0] +dbs[0] finished a compaction: [JOB 6] compacted(default) L5 [000006] (594B) Score=0.00 + L6 [000009] (594B) Score=0.00 -> L6 [] (0B), in 0.0s (0.0s total), output rate 0B/s +in progress: [0 1 0] +dbs[1] finished a compaction: [JOB 16] compacted(default) L0 [000009 000013] (1.2KB) Score=100.00 + L6 [000005] (590B) Score=0.00 -> L6 [] (0B), in 0.0s (0.0s total), output rate 0B/s +in progress: [0 1 0] +dbs[1] finished a compaction: [JOB 17] compacted(default) L0 [000010 000014] (1.2KB) Score=100.00 + L6 [000006] (590B) Score=0.00 -> L6 [] (0B), in 0.0s (0.0s total), output rate 0B/s +in progress: [0 1 0] +dbs[1] finished a compaction: [JOB 18] compacted(default) L0 [000011 000015] (1.2KB) Score=100.00 + L6 [000007] (590B) Score=0.00 -> L6 [] (0B), in 0.0s (0.0s total), output rate 0B/s +in progress: [0 0 1] +dbs[2] finished a compaction: [JOB 5] compacted(elision-only) L6 [000004] (628B) Score=0.00 + L6 [] (0B) Score=0.00 -> L6 [] (0B), in 0.0s (0.0s total), output rate 0B/s +in progress: [0 0 1] +dbs[2] finished a compaction: [JOB 6] compacted(elision-only) L6 [000005] (628B) Score=0.00 + L6 [] (0B) Score=0.00 -> L6 [] (0B), in 0.0s (0.0s total), output rate 0B/s +in progress: [0 0 1] +dbs[2] finished a compaction: [JOB 7] compacted(elision-only) L6 [000006] (628B) Score=0.00 + L6 [] (0B) Score=0.00 -> L6 [] (0B), in 0.0s (0.0s total), output rate 0B/s +in progress: [0 0 1] +dbs[2] finished a compaction: [JOB 8] compacted(elision-only) L6 [000007] (628B) Score=0.00 + L6 [] (0B) Score=0.00 -> L6 [] (0B), in 0.0s (0.0s total), output rate 0B/s +in progress: [0 0 1] +dbs[2] finished a compaction: [JOB 9] compacted(elision-only) L6 [000008] (628B) Score=0.00 + L6 [] (0B) Score=0.00 -> L6 [] (0B), in 0.0s (0.0s total), output rate 0B/s +in progress: [0 0 1] +dbs[2] finished a compaction: [JOB 10] compacted(elision-only) L6 [000009] (628B) Score=0.00 + L6 [] (0B) Score=0.00 -> L6 [] (0B), in 0.0s (0.0s total), output rate 0B/s +in progress: [0 0 0]