diff --git a/compaction_picker.go b/compaction_picker.go index 4724a4b4c8..18664cde52 100644 --- a/compaction_picker.go +++ b/compaction_picker.go @@ -639,39 +639,13 @@ func compensatedSize(f *fileMetadata) uint64 { return f.Size + fileCompensation(f) } -// compensatedSizeAnnotator implements manifest.Annotator, annotating B-Tree -// nodes with the sum of the files' compensated sizes. Its annotation type is -// a *uint64. Compensated sizes may change once a table's stats are loaded -// asynchronously, so its values are marked as cacheable only if a file's -// stats have been loaded. -type compensatedSizeAnnotator struct { -} - -var _ manifest.Annotator = compensatedSizeAnnotator{} - -func (a compensatedSizeAnnotator) Zero(dst interface{}) interface{} { - if dst == nil { - return new(uint64) - } - v := dst.(*uint64) - *v = 0 - return v -} - -func (a compensatedSizeAnnotator) Accumulate( - f *fileMetadata, dst interface{}, -) (v interface{}, cacheOK bool) { - vptr := dst.(*uint64) - *vptr = *vptr + compensatedSize(f) - return vptr, f.StatsValid() -} - -func (a compensatedSizeAnnotator) Merge(src interface{}, dst interface{}) interface{} { - srcV := src.(*uint64) - dstV := dst.(*uint64) - *dstV = *dstV + *srcV - return dstV -} +// compensatedSizeAnnotator is a manifest.Annotator that annotates B-Tree +// nodes with the sum of the files' compensated sizes. Compensated sizes may +// change once a table's stats are loaded asynchronously, so its values are +// marked as cacheable only if a file's stats have been loaded. +var compensatedSizeAnnotator = manifest.SumAnnotator(func(f *fileMetadata) (uint64, bool) { + return compensatedSize(f), f.StatsValid() +}) // totalCompensatedSize computes the compensated size over a file metadata // iterator. Note that this function is linear in the files available to the @@ -912,10 +886,6 @@ func calculateSizeAdjust(inProgressCompactions []compactionInfo) [numLevels]leve return sizeAdjust } -func levelCompensatedSize(lm manifest.LevelMetadata) uint64 { - return *lm.Annotation(compensatedSizeAnnotator{}).(*uint64) -} - func (p *compactionPickerByScore) calculateLevelScores( inProgressCompactions []compactionInfo, ) [numLevels]candidateLevelInfo { @@ -932,7 +902,7 @@ func (p *compactionPickerByScore) calculateLevelScores( } sizeAdjust := calculateSizeAdjust(inProgressCompactions) for level := 1; level < numLevels; level++ { - compensatedLevelSize := levelCompensatedSize(p.vers.Levels[level]) + sizeAdjust[level].compensated() + compensatedLevelSize := compensatedSizeAnnotator.LevelAnnotation(p.vers.Levels[level]) + sizeAdjust[level].compensated() scores[level].compensatedScore = float64(compensatedLevelSize) / float64(p.levelMaxBytes[level]) scores[level].uncompensatedScore = float64(p.vers.Levels[level].Size()+sizeAdjust[level].actual()) / float64(p.levelMaxBytes[level]) } @@ -1393,109 +1363,51 @@ func (p *compactionPickerByScore) addScoresToPickedCompactionMetrics( } } -// elisionOnlyAnnotator implements the manifest.Annotator interface, -// annotating B-Tree nodes with the *fileMetadata of a file meeting the -// obsolete keys criteria for an elision-only compaction within the subtree. -// If multiple files meet the criteria, it chooses whichever file has the -// lowest LargestSeqNum. The lowest LargestSeqNum file will be the first -// eligible for an elision-only compaction once snapshots less than or equal -// to its LargestSeqNum are closed. -type elisionOnlyAnnotator struct{} - -var _ manifest.Annotator = elisionOnlyAnnotator{} - -func (a elisionOnlyAnnotator) Zero(interface{}) interface{} { - return nil -} - -func (a elisionOnlyAnnotator) Accumulate(f *fileMetadata, dst interface{}) (interface{}, bool) { - if f.IsCompacting() { - return dst, true - } - if !f.StatsValid() { - return dst, false - } - // Bottommost files are large and not worthwhile to compact just - // to remove a few tombstones. Consider a file ineligible if its - // own range deletions delete less than 10% of its data and its - // deletion tombstones make up less than 10% of its entries. - // - // TODO(jackson): This does not account for duplicate user keys - // which may be collapsed. Ideally, we would have 'obsolete keys' - // statistics that would include tombstones, the keys that are - // dropped by tombstones and duplicated user keys. See #847. - // - // Note that tables that contain exclusively range keys (i.e. no point keys, - // `NumEntries` and `RangeDeletionsBytesEstimate` are both zero) are excluded - // from elision-only compactions. - // TODO(travers): Consider an alternative heuristic for elision of range-keys. - if f.Stats.RangeDeletionsBytesEstimate*10 < f.Size && - f.Stats.NumDeletions*10 <= f.Stats.NumEntries { - return dst, true - } - if dst == nil { - return f, true - } else if dstV := dst.(*fileMetadata); dstV.LargestSeqNum > f.LargestSeqNum { - return f, true - } - return dst, true -} - -func (a elisionOnlyAnnotator) Merge(v interface{}, accum interface{}) interface{} { - if v == nil { - return accum - } - // If we haven't accumulated an eligible file yet, or f's LargestSeqNum is - // less than the accumulated file's, use f. - if accum == nil { - return v - } - f := v.(*fileMetadata) - accumV := accum.(*fileMetadata) - if accumV == nil || accumV.LargestSeqNum > f.LargestSeqNum { - return f - } - return accumV -} - -// markedForCompactionAnnotator implements the manifest.Annotator interface, -// annotating B-Tree nodes with the *fileMetadata of a file that is marked for -// compaction within the subtree. If multiple files meet the criteria, it -// chooses whichever file has the lowest LargestSeqNum. -type markedForCompactionAnnotator struct{} - -var _ manifest.Annotator = markedForCompactionAnnotator{} - -func (a markedForCompactionAnnotator) Zero(interface{}) interface{} { - return nil -} - -func (a markedForCompactionAnnotator) Accumulate( - f *fileMetadata, dst interface{}, -) (interface{}, bool) { - if !f.MarkedForCompaction { - // Not marked for compaction; return dst. - return dst, true - } - return markedMergeHelper(f, dst) -} - -func (a markedForCompactionAnnotator) Merge(v interface{}, accum interface{}) interface{} { - if v == nil { - return accum - } - accum, _ = markedMergeHelper(v.(*fileMetadata), accum) - return accum -} - -// REQUIRES: f is non-nil, and f.MarkedForCompaction=true. -func markedMergeHelper(f *fileMetadata, dst interface{}) (interface{}, bool) { - if dst == nil { - return f, true - } else if dstV := dst.(*fileMetadata); dstV.LargestSeqNum > f.LargestSeqNum { - return f, true - } - return dst, true +// elisionOnlyAnnotator is a manifest.Annotator that annotates B-Tree +// nodes with the *fileMetadata of a file meeting the obsolete keys criteria +// for an elision-only compaction within the subtree. If multiple files meet +// the criteria, it chooses whichever file has the lowest LargestSeqNum. The +// lowest LargestSeqNum file will be the first eligible for an elision-only +// compaction once snapshots less than or equal to its LargestSeqNum are closed. +var elisionOnlyAnnotator = &manifest.Annotator[*fileMetadata]{ + Aggregator: manifest.PickFileAggregator{ + Filter: func(f *fileMetadata) (eligible bool, cacheOK bool) { + // Bottommost files are large and not worthwhile to compact just + // to remove a few tombstones. Consider a file eligible only if + // either its own range deletions delete at least 10% of its data or + // its deletion tombstones make at least 10% of its entries. + // + // TODO(jackson): This does not account for duplicate user keys + // which may be collapsed. Ideally, we would have 'obsolete keys' + // statistics that would include tombstones, the keys that are + // dropped by tombstones and duplicated user keys. See #847. + // + // Note that tables that contain exclusively range keys (i.e. no point keys, + // `NumEntries` and `RangeDeletionsBytesEstimate` are both zero) are excluded + // from elision-only compactions. + // TODO(travers): Consider an alternative heuristic for elision of range-keys. + deletionCriteria := f.Stats.RangeDeletionsBytesEstimate*10 >= f.Size || f.Stats.NumDeletions*10 > f.Stats.NumEntries + return !f.IsCompacting() && f.StatsValid() && deletionCriteria, f.StatsValid() + }, + Compare: func(f1 *fileMetadata, f2 *fileMetadata) bool { + return f1.LargestSeqNum < f2.LargestSeqNum + }, + }, +} + +// markedForCompactionAnnotator is a manifest.Annotator that annotates B-Tree +// nodes with the *fileMetadata of a file that is marked for compaction +// within the subtree. If multiple files meet the criteria, it chooses +// whichever file has the lowest LargestSeqNum. +var markedForCompactionAnnotator = &manifest.Annotator[*fileMetadata]{ + Aggregator: manifest.PickFileAggregator{ + Filter: func(f *fileMetadata) (eligible bool, cacheOK bool) { + return f.MarkedForCompaction, true + }, + Compare: func(f1 *fileMetadata, f2 *fileMetadata) bool { + return f1.LargestSeqNum < f2.LargestSeqNum + }, + }, } // pickElisionOnlyCompaction looks for compactions of sstables in the @@ -1506,11 +1418,10 @@ func (p *compactionPickerByScore) pickElisionOnlyCompaction( if p.opts.private.disableElisionOnlyCompactions { return nil } - v := p.vers.Levels[numLevels-1].Annotation(elisionOnlyAnnotator{}) - if v == nil { + candidate := elisionOnlyAnnotator.LevelAnnotation(p.vers.Levels[numLevels-1]) + if candidate == nil { return nil } - candidate := v.(*fileMetadata) if candidate.IsCompacting() || candidate.LargestSeqNum >= env.earliestSnapshotSeqNum { return nil } @@ -1542,12 +1453,11 @@ func (p *compactionPickerByScore) pickElisionOnlyCompaction( // the input level. func (p *compactionPickerByScore) pickRewriteCompaction(env compactionEnv) (pc *pickedCompaction) { for l := numLevels - 1; l >= 0; l-- { - v := p.vers.Levels[l].Annotation(markedForCompactionAnnotator{}) - if v == nil { + candidate := markedForCompactionAnnotator.LevelAnnotation(p.vers.Levels[l]) + if candidate == nil { // Try the next level. continue } - candidate := v.(*fileMetadata) if candidate.IsCompacting() { // Try the next level. continue diff --git a/compaction_picker_test.go b/compaction_picker_test.go index d305b1678f..bd2edb32d8 100644 --- a/compaction_picker_test.go +++ b/compaction_picker_test.go @@ -563,7 +563,7 @@ func TestCompactionPickerL0(t *testing.T) { } f.MarkedForCompaction = true picker.vers.Stats.MarkedForCompaction++ - picker.vers.Levels[l].InvalidateAnnotation(markedForCompactionAnnotator{}) + markedForCompactionAnnotator.InvalidateLevelAnnotation(picker.vers.Levels[l]) return fmt.Sprintf("marked L%d.%s", l, f.FileNum) } } diff --git a/compaction_test.go b/compaction_test.go index 4a7e95ddbc..68be949158 100644 --- a/compaction_test.go +++ b/compaction_test.go @@ -2505,7 +2505,7 @@ func TestMarkedForCompaction(t *testing.T) { } f.MarkedForCompaction = true vers.Stats.MarkedForCompaction++ - vers.Levels[l].InvalidateAnnotation(markedForCompactionAnnotator{}) + markedForCompactionAnnotator.InvalidateLevelAnnotation(vers.Levels[l]) return fmt.Sprintf("marked L%d.%s", l, f.FileNum) } } diff --git a/db.go b/db.go index d879f68825..bf36a3e88e 100644 --- a/db.go +++ b/db.go @@ -1995,8 +1995,8 @@ func (d *DB) Metrics() *Metrics { metrics.private.optionsFileSize = d.optionsFileSize // TODO(jackson): Consider making these metrics optional. - metrics.Keys.RangeKeySetsCount = countRangeKeySetFragments(vers) - metrics.Keys.TombstoneCount = countTombstones(vers) + metrics.Keys.RangeKeySetsCount = rangeKeySetsAnnotator.MultiLevelAnnotation(vers.RangeKeyLevels[:]) + metrics.Keys.TombstoneCount = tombstonesAnnotator.MultiLevelAnnotation(vers.Levels[:]) d.mu.versions.logLock() metrics.private.manifestFileSize = uint64(d.mu.versions.manifest.Size()) @@ -2014,12 +2014,12 @@ func (d *DB) Metrics() *Metrics { metrics.Flush.NumInProgress = 1 } for i := 0; i < numLevels; i++ { - metrics.Levels[i].Additional.ValueBlocksSize = valueBlocksSizeForLevel(vers, i) - unknown, snappy, none, zstd := compressionTypesForLevel(vers, i) - metrics.Table.CompressedCountUnknown += int64(unknown) - metrics.Table.CompressedCountSnappy += int64(snappy) - metrics.Table.CompressedCountZstd += int64(zstd) - metrics.Table.CompressedCountNone += int64(none) + metrics.Levels[i].Additional.ValueBlocksSize = valueBlockSizeAnnotator.LevelAnnotation(vers.Levels[i]) + compressionTypes := compressionTypeAnnotator.LevelAnnotation(vers.Levels[i]) + metrics.Table.CompressedCountUnknown += int64(compressionTypes.unknown) + metrics.Table.CompressedCountSnappy += int64(compressionTypes.snappy) + metrics.Table.CompressedCountZstd += int64(compressionTypes.zstd) + metrics.Table.CompressedCountNone += int64(compressionTypes.none) } d.mu.Unlock() diff --git a/format_major_version.go b/format_major_version.go index b7c501f79d..90ef9f0b6a 100644 --- a/format_major_version.go +++ b/format_major_version.go @@ -517,7 +517,7 @@ func (d *DB) markFilesLocked(findFn findFilesFunc) error { // annotations will be out of date. Clear the compaction-picking // annotation, so that it's recomputed the next time the compaction // picker looks for a file marked for compaction. - vers.Levels[l].InvalidateAnnotation(markedForCompactionAnnotator{}) + markedForCompactionAnnotator.InvalidateLevelAnnotation(vers.Levels[l]) } // The 'marked-for-compaction' bit is persisted in the MANIFEST file diff --git a/internal/manifest/annotator.go b/internal/manifest/annotator.go new file mode 100644 index 0000000000..f0240afabe --- /dev/null +++ b/internal/manifest/annotator.go @@ -0,0 +1,313 @@ +// Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package manifest + +// The Annotator type defined below is used by other packages to lazily +// compute a value over a B-Tree. Each node of the B-Tree stores one +// `annotation` per annotator, containing the result of the computation over +// the node's subtree. +// +// An annotation is marked as valid if it's current with the current subtree +// state. Annotations are marked as invalid whenever a node will be mutated +// (in mut). Annotators may also return `false` from `Accumulate` to signal +// that a computation for a file is not stable and may change in the future. +// Annotations that include these unstable values are also marked as invalid +// on the node, ensuring that future queries for the annotation will recompute +// the value. + +// An Annotator defines a computation over a level's FileMetadata. If the +// computation is stable and uses inputs that are fixed for the lifetime of +// a FileMetadata, the LevelMetadata's internal data structures are annotated +// with the intermediary computations. This allows the computation to be +// computed incrementally as edits are applied to a level. +type Annotator[T any] struct { + Aggregator AnnotationAggregator[T] +} + +// An AnnotationAggregator defines how an annotation should be accumulated +// from a single FileMetadata and merged with other annotated values. +type AnnotationAggregator[T any] interface { + // Zero returns the zero value of an annotation. This value is returned + // when a LevelMetadata is empty. + Zero() T + + // Accumulate computes the annotation for a single file in a level's + // metadata. It also returns a bool flag indicating whether or not the + // value is stable and okay to cache as an annotation, which must be false + // if the file's value may change over the life of the file. + Accumulate(f *FileMetadata) (v T, cacheOK bool) + + // Merge combines two annotated values src and dst, returning the result. + Merge(v1 T, v2 T) T +} + +type annotation struct { + // annotator is a pointer to the Annotator that computed this annotation. + // NB: This is untyped to allow AnnotationAggregator to use Go generics, + // since annotations are stored in a slice on each node and a single + // slice cannot contain elements with different type parameters. + annotator interface{} + // vptr is a pointer to the annotation value, the output of either + // annotator.Value or annotator.Merge. + // NB: This is untyped for the same reason as annotator above. We store + // a pointer instead of the value itself in order to eliminate a heap + // allocation every time an annotation is computed. + vptr interface{} + // valid indicates whether future reads of the annotation may use the + // value as-is. If false, v will be zeroed and recalculated. + valid bool +} + +// findAnnotation finds this Annotator's annotation on a node, creating +// one if it doesn't already exist. +func (a *Annotator[T]) findAnnotation(n *node) *annotation { + for i := range n.annot { + if n.annot[i].annotator == a { + return &n.annot[i] + } + } + + // This node has never been annotated by a. Create a new annotation. + n.annot = append(n.annot, annotation{ + annotator: a, + vptr: new(T), + }) + return &n.annot[len(n.annot)-1] +} + +// nodeRangeAnnotation computes this annotator's annotation of this node across +// all files in the inclusive range [lowerBound, upperBound]. The second return +// value indicates whether the annotation is stable and thus cacheable. +func (a *Annotator[T]) nodeRangeAnnotation( + n *node, + cmp btreeCmp, + // NB: lowerBound and upperBound are passed as FileMetadata pointers + // because btreeCmp expects this type. + lowerBound *FileMetadata, + upperBound *FileMetadata, + // fullyWithinLowerBound and fullyWithinUpperBound indicate whether + // this node's subtree is already known to be within each bound. + fullyWithinLowerBound bool, + fullyWithinUpperBound bool, +) (v T, cacheOK bool) { + annot := a.findAnnotation(n) + // If the annotation is already marked as valid and this node's + // subtree is fully within the bounds, we can return it without + // recomputing anything. + if fullyWithinLowerBound && fullyWithinUpperBound && annot.valid { + return *annot.vptr.(*T), true + } + + aggregated := a.Aggregator.Zero() + valid := true + + left, right := 0, int(n.count) + overlapsRight := false + if !fullyWithinLowerBound { + left, _ = n.find(cmp, lowerBound) + } + if !fullyWithinUpperBound { + right, overlapsRight = n.find(cmp, upperBound) + } + + // Iterate over every item and child node which could overlap + // with the bounds. + for i := left; i <= right; i++ { + if !n.leaf { + v, ok := a.nodeRangeAnnotation( + n.children[i], + cmp, + lowerBound, + upperBound, + // The children at left and right may or may not be within + // the bounds, but all other children must be due to the + // btree's structure. + fullyWithinLowerBound || i > left, + fullyWithinUpperBound || i < right, + ) + aggregated = a.Aggregator.Merge(v, aggregated) + valid = valid && ok + } + + // Accumulate annotations from every item in the range [left, right). + // NB: We only accumulate from n.items[right] in the case that + // its smallest key is exactly equal to the upper bound. + if overlapsRight || i < right { + v, ok := a.Aggregator.Accumulate(n.items[i]) + aggregated = a.Aggregator.Merge(v, aggregated) + valid = valid && ok + } + } + + // Update this node's cached annotation only if we accumulated from + // this node's entire subtree. + if fullyWithinLowerBound && fullyWithinUpperBound { + *annot.vptr.(*T) = aggregated + annot.valid = valid + } + + return aggregated, valid +} + +// InvalidateAnnotation removes any existing cached annotations from this +// annotator from a node's subtree. +func (a *Annotator[T]) invalidateNodeAnnotation(n *node) { + annot := a.findAnnotation(n) + annot.valid = false + if !n.leaf { + for i := int16(0); i <= n.count; i++ { + a.invalidateNodeAnnotation(n.children[i]) + } + } +} + +// LevelAnnotation calculates the annotation defined by this Annotator for all +// files in the given LevelMetadata. A pointer to the Annotator is used as the +// key for pre-calculated values, so the same Annotator must be used to avoid +// duplicate computation. Annotation must not be called concurrently, and in +// practice this is achieved by requiring callers to hold DB.mu. +func (a *Annotator[T]) LevelAnnotation(lm LevelMetadata) T { + if lm.Empty() { + return a.Aggregator.Zero() + } + + v, _ := a.nodeRangeAnnotation(lm.tree.root, lm.tree.cmp, nil, nil, true, true) + return v +} + +// LevelAnnotation calculates the annotation defined by this Annotator for all +// files across the given levels. A pointer to the Annotator is used as the +// key for pre-calculated values, so the same Annotator must be used to avoid +// duplicate computation. Annotation must not be called concurrently, and in +// practice this is achieved by requiring callers to hold DB.mu. +func (a *Annotator[T]) MultiLevelAnnotation(lms []LevelMetadata) T { + aggregated := a.Aggregator.Zero() + for l := 0; l < len(lms); l++ { + if !lms[l].Empty() { + v := a.LevelAnnotation(lms[l]) + aggregated = a.Aggregator.Merge(v, aggregated) + } + } + return aggregated +} + +// LevelRangeAnnotation calculates the annotation defined by this Annotator for +// the files within LevelMetadata which are within the inclusive range +// [lowerBound, upperBound]. A pointer to the Annotator is used as the key for +// pre-calculated values, so the same Annotator must be used to avoid duplicate +// computation. Annotation must not be called concurrently, and in practice this +// is achieved by requiring callers to hold DB.mu. +func (a *Annotator[T]) LevelRangeAnnotation( + lm LevelMetadata, lowerBound InternalKey, upperBound InternalKey, +) T { + if lm.Empty() { + return a.Aggregator.Zero() + } + + // fake filemetadata + lowerBoundMeta := &FileMetadata{ + Smallest: lowerBound, + } + upperBoundMeta := &FileMetadata{ + Smallest: upperBound, + } + + // TODO: assert lm.tree.cmp is by smallest key (L0cmp) + + // we ignore ok because + v, _ := a.nodeRangeAnnotation(lm.tree.root, lm.tree.cmp, lowerBoundMeta, upperBoundMeta, false, false) + return v +} + +// InvalidateAnnotation clears any cached annotations defined by Annotator. A +// pointer to the Annotator is used as the key for pre-calculated values, so +// the same Annotator must be used to clear the appropriate cached annotation. +// InvalidateAnnotation must not be called concurrently, and in practice this +// is achieved by requiring callers to hold DB.mu. +func (a *Annotator[T]) InvalidateLevelAnnotation(lm LevelMetadata) { + if lm.Empty() { + return + } + a.invalidateNodeAnnotation(lm.tree.root) +} + +// sumAggregator defines an Aggregator which sums together a uint64 value +// across files. +type sumAggregator struct { + accumulateFunc func(f *FileMetadata) (v uint64, cacheOK bool) +} + +func (sa sumAggregator) Zero() uint64 { + return 0 +} + +func (sa sumAggregator) Accumulate(f *FileMetadata) (v uint64, cacheOK bool) { + return sa.accumulateFunc(f) +} + +func (sa sumAggregator) Merge(val1 uint64, val2 uint64) uint64 { + return val1 + val2 +} + +// SumAnnotator takes a function that computes a uint64 value from a single +// FileMetadata and returns an Annotator that sums together the values across +// files. +func SumAnnotator(accumulate func(f *FileMetadata) (v uint64, cacheOK bool)) *Annotator[uint64] { + return &Annotator[uint64]{ + Aggregator: sumAggregator{ + accumulateFunc: accumulate, + }, + } +} + +// NumFilesAnnotator is an Annotator which computes an annotation value +// equal to the number of files included in the annotation. Particularly, it +// can be used to efficiently calculate the number of files in a given key +// range using range annotations. +var NumFilesAnnotator = SumAnnotator(func(f *FileMetadata) (uint64, bool) { + return 1, true +}) + +// PickFileAggregator implements the AnnotationAggregator interface. It defines +// an aggregator that picks a single file from a set of eligible files. +type PickFileAggregator struct { + // Filter takes a FileMetadata and returns whether it is eligible to be + // picked by this PickFileAggregator. The second return value indicates + // whether this eligibility is stable and thus cacheable. + Filter func(f *FileMetadata) (eligible bool, cacheOK bool) + // Compare compares two instances of FileMetadata and returns true if + // the first one should be picked over the second one. It may assume + // that both arguments are non-nil. + Compare func(f1 *FileMetadata, f2 *FileMetadata) bool +} + +// Zero implements AnnotationAggregator.Zero, returning nil as the zero value. +func (fa PickFileAggregator) Zero() *FileMetadata { + return nil +} + +// Accumulate implements AnnotationAggregator.Accumulate, accumulating a single +// file as long as it is eligible to be picked. +func (fa PickFileAggregator) Accumulate(f *FileMetadata) (v *FileMetadata, cacheOK bool) { + eligible, ok := fa.Filter(f) + if eligible { + return f, ok + } + return nil, ok +} + +// Merge implements AnnotationAggregator.Merge by picking a single file based +// on the output of PickFileAggregator.Compare. +func (fa PickFileAggregator) Merge(f1 *FileMetadata, f2 *FileMetadata) *FileMetadata { + if f1 == nil { + return f2 + } else if f2 == nil { + return f1 + } + if fa.Compare(f1, f2) { + return f1 + } + return f2 +} diff --git a/internal/manifest/annotator_test.go b/internal/manifest/annotator_test.go new file mode 100644 index 0000000000..ee8f59e5f6 --- /dev/null +++ b/internal/manifest/annotator_test.go @@ -0,0 +1,118 @@ +// Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package manifest + +import ( + "math/rand" + "testing" + + "github.com/cockroachdb/pebble/internal/base" + "github.com/stretchr/testify/require" +) + +func makeTestLevelMetadata(count int) (LevelMetadata, []*FileMetadata) { + files := make([]*FileMetadata, count) + for i := 0; i < count; i++ { + files[i] = newItem(key(i)) + } + + lm := MakeLevelMetadata(base.DefaultComparer.Compare, 6, files) + return lm, files +} + +func TestNumFilesAnnotator(t *testing.T) { + const count = 1000 + lm, _ := makeTestLevelMetadata(0) + + for i := 1; i <= count; i++ { + lm.tree.Insert(newItem(key(i))) + numFiles := NumFilesAnnotator.LevelAnnotation(lm) + require.EqualValues(t, i, numFiles) + } + + numFiles := NumFilesAnnotator.LevelAnnotation(lm) + require.EqualValues(t, count, numFiles) + + numFiles = NumFilesAnnotator.LevelAnnotation(lm) + require.EqualValues(t, count, numFiles) +} + +func BenchmarkNumFilesAnnotator(b *testing.B) { + lm, _ := makeTestLevelMetadata(0) + for i := 1; i <= b.N; i++ { + lm.tree.Insert(newItem(key(i))) + numFiles := NumFilesAnnotator.LevelAnnotation(lm) + require.EqualValues(b, i, numFiles) + } +} + +func TestNumFilesRangeAnnotationEmptyRanges(t *testing.T) { + lm, files := makeTestLevelMetadata(5_000) + + // Delete key ranges in the beginning and middle. + for i := 0; i < 100; i++ { + lm.tree.Delete(files[i]) + } + for i := 2400; i < 2600; i++ { + lm.tree.Delete(files[i]) + } + + // Ranges that are completely empty. + v := NumFilesAnnotator.LevelRangeAnnotation(lm, key(1), key(99)) + require.EqualValues(t, 0, v) + v = NumFilesAnnotator.LevelRangeAnnotation(lm, key(5001), key(6000)) + require.EqualValues(t, 0, v) + v = NumFilesAnnotator.LevelRangeAnnotation(lm, key(2450), key(2550)) + require.EqualValues(t, 0, v) + + // Partial overlaps with empty ranges. + v = NumFilesAnnotator.LevelRangeAnnotation(lm, key(0), key(100)) + require.EqualValues(t, 1, v) + v = NumFilesAnnotator.LevelRangeAnnotation(lm, key(2300), key(2700)) + require.EqualValues(t, 201, v) + v = NumFilesAnnotator.LevelRangeAnnotation(lm, key(2500), key(4000)) + require.EqualValues(t, 1401, v) + + // Range which only spans a single table. + v = NumFilesAnnotator.LevelRangeAnnotation(lm, key(2300), key(2300)) + require.EqualValues(t, 1, v) +} + +func TestNumFilesRangeAnnotationRandomized(t *testing.T) { + const count = 10_000 + const numIterations = 100_000 + lm, _ := makeTestLevelMetadata(count) + + rng := rand.New(rand.NewSource(int64(0))) + for i := 0; i < numIterations; i++ { + left := rng.Intn(count) + right := left + rng.Intn(count-left) + + v := NumFilesAnnotator.LevelRangeAnnotation(lm, key(left), key(right)) + + // There are right - left + 1 files overlapping the range + // [left, right] inclusive. + require.EqualValues(t, right-left+1, v) + } +} + +func BenchmarkNumFilesRangeAnnotation(b *testing.B) { + const count = 10_000 + lm, files := makeTestLevelMetadata(count) + + rng := rand.New(rand.NewSource(int64(0))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + left := rng.Intn(count) + right := left + rng.Intn(count-left) + + // Randomly delete and reinsert a key within the range to verify + // that range annotations are still fast despite small mutations. + toDelete := rng.Intn(count) + lm.tree.Delete(files[toDelete]) + NumFilesAnnotator.LevelRangeAnnotation(lm, key(left), key(right)) + lm.tree.Insert(files[toDelete]) + } +} diff --git a/internal/manifest/btree.go b/internal/manifest/btree.go index b098a179bf..350200b612 100644 --- a/internal/manifest/btree.go +++ b/internal/manifest/btree.go @@ -16,45 +16,6 @@ import ( "github.com/cockroachdb/pebble/internal/invariants" ) -// The Annotator type defined below is used by other packages to lazily -// compute a value over a B-Tree. Each node of the B-Tree stores one -// `annotation` per annotator, containing the result of the computation over -// the node's subtree. -// -// An annotation is marked as valid if it's current with the current subtree -// state. Annotations are marked as invalid whenever a node will be mutated -// (in mut). Annotators may also return `false` from `Accumulate` to signal -// that a computation for a file is not stable and may change in the future. -// Annotations that include these unstable values are also marked as invalid -// on the node, ensuring that future queries for the annotation will recompute -// the value. - -// An Annotator defines a computation over a level's FileMetadata. If the -// computation is stable and uses inputs that are fixed for the lifetime of -// a FileMetadata, the LevelMetadata's internal data structures are annotated -// with the intermediary computations. This allows the computation to be -// computed incrementally as edits are applied to a level. -type Annotator interface { - // Zero returns the zero value of an annotation. This value is returned - // when a LevelMetadata is empty. The dst argument, if non-nil, is an - // obsolete value previously returned by this Annotator and may be - // overwritten and reused to avoid a memory allocation. - Zero(dst interface{}) (v interface{}) - - // Accumulate computes the annotation for a single file in a level's - // metadata. It merges the file's value into dst and returns a bool flag - // indicating whether or not the value is stable and okay to cache as an - // annotation. If the file's value may change over the life of the file, - // the annotator must return false. - // - // Implementations may modify dst and return it to avoid an allocation. - Accumulate(m *FileMetadata, dst interface{}) (v interface{}, cacheOK bool) - - // Merge combines two values src and dst, returning the result. - // Implementations may modify dst and return it to avoid an allocation. - Merge(src interface{}, dst interface{}) interface{} -} - type btreeCmp func(*FileMetadata, *FileMetadata) int func btreeCmpSeqNum(a, b *FileMetadata) int { @@ -91,16 +52,6 @@ const ( minItems = degree - 1 ) -type annotation struct { - annotator Annotator - // v is an annotation value, the output of either - // annotator.Value or annotator.Merge. - v interface{} - // valid indicates whether future reads of the annotation may use v as-is. - // If false, v will be zeroed and recalculated. - valid bool -} - type leafNode struct { ref atomic.Int32 count int16 @@ -659,78 +610,6 @@ func (n *node) rebalanceOrMerge(i int) { } } -// InvalidateAnnotation removes any existing cached annotations for the provided -// annotator from this node's subtree. -func (n *node) InvalidateAnnotation(a Annotator) { - // Find this annotator's annotation on this node. - var annot *annotation - for i := range n.annot { - if n.annot[i].annotator == a { - annot = &n.annot[i] - } - } - - if annot != nil && annot.valid { - annot.valid = false - annot.v = a.Zero(annot.v) - } - if !n.leaf { - for i := int16(0); i <= n.count; i++ { - n.children[i].InvalidateAnnotation(a) - } - } -} - -// Annotation retrieves, computing if not already computed, the provided -// annotator's annotation of this node. The second return value indicates -// whether the future reads of this annotation may use the first return value -// as-is. If false, the annotation is not stable and may change on a subsequent -// computation. -func (n *node) Annotation(a Annotator) (interface{}, bool) { - // Find this annotator's annotation on this node. - var annot *annotation - for i := range n.annot { - if n.annot[i].annotator == a { - annot = &n.annot[i] - } - } - - // If it exists and is marked as valid, we can return it without - // recomputing anything. - if annot != nil && annot.valid { - return annot.v, true - } - - if annot == nil { - // This is n's first time being annotated by a. - // Create a new zeroed annotation. - n.annot = append(n.annot, annotation{ - annotator: a, - v: a.Zero(nil), - }) - annot = &n.annot[len(n.annot)-1] - } else { - // There's an existing annotation that must be recomputed. - // Zero its value. - annot.v = a.Zero(annot.v) - } - - annot.valid = true - for i := int16(0); i <= n.count; i++ { - if !n.leaf { - v, ok := n.children[i].Annotation(a) - annot.v = a.Merge(v, annot.v) - annot.valid = annot.valid && ok - } - if i < n.count { - v, ok := a.Accumulate(n.items[i], annot.v) - annot.v = v - annot.valid = annot.valid && ok - } - } - return annot.v, annot.valid -} - func (n *node) verifyInvariants() { recomputedSubtreeCount := int(n.count) if !n.leaf { diff --git a/internal/manifest/btree_test.go b/internal/manifest/btree_test.go index cce22a2563..1c4b2ed2cd 100644 --- a/internal/manifest/btree_test.go +++ b/internal/manifest/btree_test.go @@ -145,10 +145,10 @@ func (n *node) recurse(f func(child *node, pos int16)) { ////////////////////////////////////////// func key(i int) InternalKey { - if i < 0 || i > 99999 { + if i < 0 || i > 9999999 { panic("key out of bounds") } - return base.MakeInternalKey([]byte(fmt.Sprintf("%05d", i)), 0, base.InternalKeyKindSet) + return base.MakeInternalKey([]byte(fmt.Sprintf("%07d", i)), 0, base.InternalKeyKindSet) } func keyWithMemo(i int, memo map[int]InternalKey) InternalKey { @@ -534,56 +534,6 @@ func TestIterEndSentinel(t *testing.T) { require.True(t, iter.iter.valid()) } -type orderStatistic struct{} - -func (o orderStatistic) Zero(dst interface{}) interface{} { - if dst == nil { - return new(int) - } - v := dst.(*int) - *v = 0 - return v -} - -func (o orderStatistic) Accumulate(meta *FileMetadata, dst interface{}) (interface{}, bool) { - v := dst.(*int) - *v++ - return v, true -} - -func (o orderStatistic) Merge(src interface{}, dst interface{}) interface{} { - srcv := src.(*int) - dstv := dst.(*int) - *dstv = *dstv + *srcv - return dstv -} - -func TestAnnotationOrderStatistic(t *testing.T) { - const count = 1000 - ann := orderStatistic{} - - var tr btree - tr.cmp = cmp - for i := 1; i <= count; i++ { - require.NoError(t, tr.Insert(newItem(key(i)))) - - v, ok := tr.root.Annotation(ann) - require.True(t, ok) - vtyped := v.(*int) - require.Equal(t, i, *vtyped) - } - - v, ok := tr.root.Annotation(ann) - require.True(t, ok) - vtyped := v.(*int) - require.Equal(t, count, *vtyped) - - v, ok = tr.root.Annotation(ann) - vtyped = v.(*int) - require.True(t, ok) - require.Equal(t, count, *vtyped) -} - // TestRandomizedBTree tests a random set of Insert, Delete and iteration // operations, checking for equivalence with a map of filenums. func TestRandomizedBTree(t *testing.T) { diff --git a/internal/manifest/level_metadata.go b/internal/manifest/level_metadata.go index c3008e809b..7cba73e14a 100644 --- a/internal/manifest/level_metadata.go +++ b/internal/manifest/level_metadata.go @@ -140,31 +140,6 @@ func (lm *LevelMetadata) Find(cmp base.Compare, m *FileMetadata) LevelSlice { return LevelSlice{} } -// Annotation lazily calculates and returns the annotation defined by -// Annotator. The Annotator is used as the key for pre-calculated -// values, so equal Annotators must be used to avoid duplicate computations -// and cached annotations. Annotation must not be called concurrently, and in -// practice this is achieved by requiring callers to hold DB.mu. -func (lm *LevelMetadata) Annotation(annotator Annotator) interface{} { - if lm.Empty() { - return annotator.Zero(nil) - } - v, _ := lm.tree.root.Annotation(annotator) - return v -} - -// InvalidateAnnotation clears any cached annotations defined by Annotator. The -// Annotator is used as the key for pre-calculated values, so equal Annotators -// must be used to clear the appropriate cached annotation. InvalidateAnnotation -// must not be called concurrently, and in practice this is achieved by -// requiring callers to hold DB.mu. -func (lm *LevelMetadata) InvalidateAnnotation(annotator Annotator) { - if lm.Empty() { - return - } - lm.tree.root.InvalidateAnnotation(annotator) -} - // LevelFile holds a file's metadata along with its position // within a level of the LSM. type LevelFile struct { diff --git a/table_stats.go b/table_stats.go index a670dc1194..49acf1220c 100644 --- a/table_stats.go +++ b/table_stats.go @@ -989,206 +989,72 @@ func newCombinedDeletionKeyspanIter( return mIter, nil } -// rangeKeySetsAnnotator implements manifest.Annotator, annotating B-Tree nodes -// with the sum of the files' counts of range key fragments. Its annotation type -// is a *uint64. The count of range key sets may change once a table's stats are +// rangeKeySetsAnnotator is a manifest.Annotator that annotates B-Tree nodes +// with the sum of the files' counts of range key fragments. The count of range +// key sets may change once a table's stats are loaded asynchronously, so its +// values are marked as cacheable only if a file's stats have been loaded. +var rangeKeySetsAnnotator = manifest.SumAnnotator(func(f *manifest.FileMetadata) (uint64, bool) { + return f.Stats.NumRangeKeySets, f.StatsValid() +}) + +// tombstonesAnnotator is a manifest.Annotator that annotates B-Tree nodes +// with the sum of the files' counts of tombstones (DEL, SINGLEDEL and RANGEDEL +// keys). The count of tombstones may change once a table's stats are loaded +// asynchronously, so its values are marked as cacheable only if a file's stats +// have been loaded. +var tombstonesAnnotator = manifest.SumAnnotator(func(f *manifest.FileMetadata) (uint64, bool) { + return f.Stats.NumDeletions, f.StatsValid() +}) + +// valueBlocksSizeAnnotator is a manifest.Annotator that annotates B-Tree +// nodes with the sum of the files' Properties.ValueBlocksSize. The value block +// size may change once a table's stats are loaded asynchronously, so its +// values are marked as cacheable only if a file's stats have been loaded. +var valueBlockSizeAnnotator = manifest.SumAnnotator(func(f *fileMetadata) (uint64, bool) { + return f.Stats.ValueBlocksSize, f.StatsValid() +}) + +// compressionTypeAnnotator is a manifest.Annotator that annotates B-tree +// nodes with the compression type of the file. Its annotation type is +// compressionTypes. The compression type may change once a table's stats are // loaded asynchronously, so its values are marked as cacheable only if a file's // stats have been loaded. -type rangeKeySetsAnnotator struct{} - -var _ manifest.Annotator = rangeKeySetsAnnotator{} - -func (a rangeKeySetsAnnotator) Zero(dst interface{}) interface{} { - if dst == nil { - return new(uint64) - } - v := dst.(*uint64) - *v = 0 - return v -} - -func (a rangeKeySetsAnnotator) Accumulate( - f *fileMetadata, dst interface{}, -) (v interface{}, cacheOK bool) { - vptr := dst.(*uint64) - *vptr = *vptr + f.Stats.NumRangeKeySets - return vptr, f.StatsValid() -} - -func (a rangeKeySetsAnnotator) Merge(src interface{}, dst interface{}) interface{} { - srcV := src.(*uint64) - dstV := dst.(*uint64) - *dstV = *dstV + *srcV - return dstV +var compressionTypeAnnotator = manifest.Annotator[compressionTypes]{ + Aggregator: compressionTypeAggregator{}, } -// countRangeKeySetFragments counts the number of RANGEKEYSET keys across all -// files of the LSM. It only counts keys in files for which table stats have -// been loaded. It uses a b-tree annotator to cache intermediate values between -// calculations when possible. -func countRangeKeySetFragments(v *version) (count uint64) { - for l := 0; l < numLevels; l++ { - if v.RangeKeyLevels[l].Empty() { - continue - } - count += *v.RangeKeyLevels[l].Annotation(rangeKeySetsAnnotator{}).(*uint64) - } - return count -} - -// tombstonesAnnotator implements manifest.Annotator, annotating B-Tree nodes -// with the sum of the files' counts of tombstones (DEL, SINGLEDEL and RANGEDELk -// eys). Its annotation type is a *uint64. The count of tombstones may change -// once a table's stats are loaded asynchronously, so its values are marked as -// cacheable only if a file's stats have been loaded. -type tombstonesAnnotator struct{} - -var _ manifest.Annotator = tombstonesAnnotator{} - -func (a tombstonesAnnotator) Zero(dst interface{}) interface{} { - if dst == nil { - return new(uint64) - } - v := dst.(*uint64) - *v = 0 - return v -} - -func (a tombstonesAnnotator) Accumulate( - f *fileMetadata, dst interface{}, -) (v interface{}, cacheOK bool) { - vptr := dst.(*uint64) - *vptr = *vptr + f.Stats.NumDeletions - return vptr, f.StatsValid() -} - -func (a tombstonesAnnotator) Merge(src interface{}, dst interface{}) interface{} { - srcV := src.(*uint64) - dstV := dst.(*uint64) - *dstV = *dstV + *srcV - return dstV -} - -// countTombstones counts the number of tombstone (DEL, SINGLEDEL and RANGEDEL) -// internal keys across all files of the LSM. It only counts keys in files for -// which table stats have been loaded. It uses a b-tree annotator to cache -// intermediate values between calculations when possible. -func countTombstones(v *version) (count uint64) { - for l := 0; l < numLevels; l++ { - if v.Levels[l].Empty() { - continue - } - count += *v.Levels[l].Annotation(tombstonesAnnotator{}).(*uint64) - } - return count -} - -// valueBlocksSizeAnnotator implements manifest.Annotator, annotating B-Tree -// nodes with the sum of the files' Properties.ValueBlocksSize. Its annotation -// type is a *uint64. The value block size may change once a table's stats are -// loaded asynchronously, so its values are marked as cacheable only if a -// file's stats have been loaded. -type valueBlocksSizeAnnotator struct{} - -var _ manifest.Annotator = valueBlocksSizeAnnotator{} - -func (a valueBlocksSizeAnnotator) Zero(dst interface{}) interface{} { - if dst == nil { - return new(uint64) - } - v := dst.(*uint64) - *v = 0 - return v -} - -func (a valueBlocksSizeAnnotator) Accumulate( - f *fileMetadata, dst interface{}, -) (v interface{}, cacheOK bool) { - vptr := dst.(*uint64) - *vptr = *vptr + f.Stats.ValueBlocksSize - return vptr, f.StatsValid() -} - -func (a valueBlocksSizeAnnotator) Merge(src interface{}, dst interface{}) interface{} { - srcV := src.(*uint64) - dstV := dst.(*uint64) - *dstV = *dstV + *srcV - return dstV -} - -// valueBlocksSizeForLevel returns the Properties.ValueBlocksSize across all -// files for a level of the LSM. It only includes the size for files for which -// table stats have been loaded. It uses a b-tree annotator to cache -// intermediate values between calculations when possible. It must not be -// called concurrently. -// -// REQUIRES: 0 <= level <= numLevels. -func valueBlocksSizeForLevel(v *version, level int) (count uint64) { - if v.Levels[level].Empty() { - return 0 - } - return *v.Levels[level].Annotation(valueBlocksSizeAnnotator{}).(*uint64) -} - -// compressionTypeAnnotator implements manifest.Annotator, annotating B-tree -// nodes with the compression type of the file. Its annotation type is a -// *compressionTypes. The compression type may change once a table's stats are -// loaded asynchronously, so its values are marked as cacheable only if a file's -// stats have been loaded. -type compressionTypeAnnotator struct{} +type compressionTypeAggregator struct{} type compressionTypes struct { snappy, zstd, none, unknown uint64 } -var _ manifest.Annotator = compressionTypeAnnotator{} - -func (a compressionTypeAnnotator) Zero(dst interface{}) interface{} { - if dst == nil { - return new(compressionTypes) - } - v := dst.(*compressionTypes) - *v = compressionTypes{} - return v +func (a compressionTypeAggregator) Zero() compressionTypes { + return compressionTypes{} } -func (a compressionTypeAnnotator) Accumulate( - f *fileMetadata, dst interface{}, -) (v interface{}, cacheOK bool) { - vptr := dst.(*compressionTypes) +func (a compressionTypeAggregator) Accumulate(f *fileMetadata) (v compressionTypes, cacheOK bool) { + types := compressionTypes{} switch f.Stats.CompressionType { case sstable.SnappyCompression: - vptr.snappy++ + types.snappy = 1 case sstable.ZstdCompression: - vptr.zstd++ + types.zstd = 1 case sstable.NoCompression: - vptr.none++ + types.none = 1 default: - vptr.unknown++ + types.unknown = 1 } - return vptr, f.StatsValid() -} - -func (a compressionTypeAnnotator) Merge(src interface{}, dst interface{}) interface{} { - srcV := src.(*compressionTypes) - dstV := dst.(*compressionTypes) - dstV.snappy = dstV.snappy + srcV.snappy - dstV.zstd = dstV.zstd + srcV.zstd - dstV.none = dstV.none + srcV.none - dstV.unknown = dstV.unknown + srcV.unknown - return dstV + return types, f.StatsValid() } -// compressionTypesForLevel returns the count of sstables by compression type -// used for a level in the LSM. Sstables with compression type snappy or zstd -// are returned, while others are ignored. -func compressionTypesForLevel(v *version, level int) (unknown, snappy, none, zstd uint64) { - if v.Levels[level].Empty() { - return - } - compression := v.Levels[level].Annotation(compressionTypeAnnotator{}).(*compressionTypes) - if compression == nil { - return +func (a compressionTypeAggregator) Merge( + c1 compressionTypes, c2 compressionTypes, +) compressionTypes { + return compressionTypes{ + snappy: c1.snappy + c2.snappy, + zstd: c1.zstd + c2.zstd, + none: c1.none + c2.none, + unknown: c1.unknown + c2.unknown, } - return compression.unknown, compression.snappy, compression.none, compression.zstd }