Skip to content

Commit

Permalink
db: refactor disk usage estimate using range annotations
Browse files Browse the repository at this point in the history
This change updates `db.EstimateDiskUsage` to use range annotations to
estimate the disk usage of a key range. This should improve the
performance of repeated disk usage estimates for similar or identical
key ranges.

At the Cockroach layer we use `db.EstimateDiskUsage` in a few places,
most notably when [computing MVCC span stats](https://github.com/cockroachdb/cockroach/blob/master/pkg/server/span_stats_server.go#L217).

Informs: #3793
  • Loading branch information
anish-shanbhag committed Aug 13, 2024
1 parent 3419a64 commit b10bfd5
Show file tree
Hide file tree
Showing 10 changed files with 418 additions and 174 deletions.
108 changes: 43 additions & 65 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,14 @@ type DB struct {
// validating is set to true when validation is running.
validating bool
}

// annotators contains various instances of manifest.Annotator which
// should be protected from concurrent access.
annotators struct {
totalSizeAnnotator *manifest.Annotator[uint64]
remoteSizeAnnotator *manifest.Annotator[uint64]
externalSizeAnnotator *manifest.Annotator[uint64]
}
}

// Normally equal to time.Now() but may be overridden in tests.
Expand Down Expand Up @@ -2226,6 +2234,31 @@ func (d *DB) SSTables(opts ...SSTablesOption) ([][]SSTableInfo, error) {
return destLevels, nil
}

// makeFileSizeAnnotator returns an annotator that computes the total size of
// files that meet some criteria defined by filter.
func (d *DB) makeFileSizeAnnotator(filter func(f *fileMetadata) bool) *manifest.Annotator[uint64] {
return &manifest.Annotator[uint64]{
Aggregator: manifest.SumAggregator{
AccumulateFunc: func(f *fileMetadata) (uint64, bool) {
if filter(f) {
return f.Size, true
}
return 0, true
},
AccumulatePartialOverlapFunc: func(f *fileMetadata, bounds base.UserKeyBounds) uint64 {
if filter(f) {
size, err := d.tableCache.estimateSize(f, bounds.Start, bounds.End.Key)
if err != nil {
return 0
}
return size
}
return 0
},
},
}
}

// EstimateDiskUsage returns the estimated filesystem space used in bytes for
// storing the range `[start, end]`. The estimation is computed as follows:
//
Expand All @@ -2252,7 +2285,9 @@ func (d *DB) EstimateDiskUsageByBackingType(
if err := d.closed.Load(); err != nil {
panic(err)
}
if d.opts.Comparer.Compare(start, end) > 0 {

bounds := base.UserKeyBoundsInclusive(start, end)
if !bounds.Valid(d.cmp) {
return 0, 0, 0, errors.New("invalid key-range specified (start > end)")
}

Expand All @@ -2262,70 +2297,13 @@ func (d *DB) EstimateDiskUsageByBackingType(
readState := d.loadReadState()
defer readState.unref()

for level, files := range readState.current.Levels {
iter := files.Iter()
if level > 0 {
// We can only use `Overlaps` to restrict `files` at L1+ since at L0 it
// expands the range iteratively until it has found a set of files that
// do not overlap any other L0 files outside that set.
overlaps := readState.current.Overlaps(level, base.UserKeyBoundsInclusive(start, end))
iter = overlaps.Iter()
}
for file := iter.First(); file != nil; file = iter.Next() {
if d.opts.Comparer.Compare(start, file.Smallest.UserKey) <= 0 &&
d.opts.Comparer.Compare(file.Largest.UserKey, end) <= 0 {
// The range fully contains the file, so skip looking it up in
// table cache/looking at its indexes, and add the full file size.
meta, err := d.objProvider.Lookup(fileTypeTable, file.FileBacking.DiskFileNum)
if err != nil {
return 0, 0, 0, err
}
if meta.IsRemote() {
remoteSize += file.Size
if meta.Remote.CleanupMethod == objstorage.SharedNoCleanup {
externalSize += file.Size
}
}
totalSize += file.Size
} else if d.opts.Comparer.Compare(file.Smallest.UserKey, end) <= 0 &&
d.opts.Comparer.Compare(start, file.Largest.UserKey) <= 0 {
var size uint64
var err error
if file.Virtual {
err = d.tableCache.withVirtualReader(
file.VirtualMeta(),
func(r sstable.VirtualReader) (err error) {
size, err = r.EstimateDiskUsage(start, end)
return err
},
)
} else {
err = d.tableCache.withReader(
file.PhysicalMeta(),
func(r *sstable.Reader) (err error) {
size, err = r.EstimateDiskUsage(start, end)
return err
},
)
}
if err != nil {
return 0, 0, 0, err
}
meta, err := d.objProvider.Lookup(fileTypeTable, file.FileBacking.DiskFileNum)
if err != nil {
return 0, 0, 0, err
}
if meta.IsRemote() {
remoteSize += size
if meta.Remote.CleanupMethod == objstorage.SharedNoCleanup {
externalSize += size
}
}
totalSize += size
}
}
}
return totalSize, remoteSize, externalSize, nil
d.mu.Lock()
defer d.mu.Unlock()

totalSize = *d.mu.annotators.totalSizeAnnotator.VersionRangeAnnotation(readState.current, bounds)
remoteSize = *d.mu.annotators.remoteSizeAnnotator.VersionRangeAnnotation(readState.current, bounds)
externalSize = *d.mu.annotators.externalSizeAnnotator.VersionRangeAnnotation(readState.current, bounds)
return
}

func (d *DB) walPreallocateSize() int {
Expand Down
187 changes: 176 additions & 11 deletions internal/manifest/annotator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@

package manifest

import (
"sort"

"github.com/cockroachdb/pebble/internal/base"
)

// The Annotator type defined below is used by other packages to lazily
// compute a value over a B-Tree. Each node of the B-Tree stores one
// `annotation` per annotator, containing the result of the computation over
Expand All @@ -24,6 +30,10 @@ package manifest
// computed incrementally as edits are applied to a level.
type Annotator[T any] struct {
Aggregator AnnotationAggregator[T]

// scratch is used to hold the aggregated annotation value when computing
// range annotations in order to avoid additional allocations.
scratch *T
}

// An AnnotationAggregator defines how an annotation should be accumulated
Expand All @@ -49,6 +59,14 @@ type AnnotationAggregator[T any] interface {
Merge(src *T, dst *T) *T
}

// A PartialOverlapAnnotationAggregator is an extension of AnnotationAggregator
// that allows for custom accumulation of range annotations for files that only
// partially overlap with the range.
type PartialOverlapAnnotationAggregator[T any] interface {
AnnotationAggregator[T]
AccumulatePartialOverlap(f *FileMetadata, dst *T, bounds base.UserKeyBounds) *T
}

type annotation struct {
// annotator is a pointer to the Annotator that computed this annotation.
// NB: This is untyped to allow AnnotationAggregator to use Go generics,
Expand Down Expand Up @@ -116,6 +134,89 @@ func (a *Annotator[T]) nodeAnnotation(n *node) (_ *T, cacheOK bool) {
return t, annot.valid
}

// accumulateRangeAnnotation computes this annotator's annotation across all
// files in the node's subtree which overlap with the range defined by bounds.
// The computed annotation is accumulated into a.scratch.
func (a *Annotator[T]) accumulateRangeAnnotation(
n *node,
cmp base.Compare,
bounds base.UserKeyBounds,
// fullyWithinLowerBound and fullyWithinUpperBound indicate whether this
// node's subtree is already known to be within each bound.
fullyWithinLowerBound bool,
fullyWithinUpperBound bool,
) {
// If this node's subtree is fully within the bounds, compute a regular
// annotation.
if fullyWithinLowerBound && fullyWithinUpperBound {
v, _ := a.nodeAnnotation(n)
a.scratch = a.Aggregator.Merge(v, a.scratch)
return
}

// We will accumulate annotations from each item in the end-exclusive
// range [leftItem, rightItem).
leftItem, rightItem := 0, int(n.count)
if !fullyWithinLowerBound {
// leftItem is the index of the first item that overlaps the lower bound.
leftItem = sort.Search(int(n.count), func(i int) bool {
return cmp(bounds.Start, n.items[i].Largest.UserKey) <= 0
})
}
if !fullyWithinUpperBound {
// rightItem is the index of the first item that does not overlap the
// upper bound.
rightItem = sort.Search(int(n.count), func(i int) bool {
return !bounds.End.IsUpperBoundFor(cmp, n.items[i].Smallest.UserKey)
})
}

// Accumulate annotations from every item that overlaps the bounds.
for i := leftItem; i < rightItem; i++ {
if i == leftItem || i == rightItem-1 {
if agg, ok := a.Aggregator.(PartialOverlapAnnotationAggregator[T]); ok {
fb := n.items[i].UserKeyBounds()
if cmp(bounds.Start, fb.Start) > 0 || bounds.End.CompareUpperBounds(cmp, fb.End) < 0 {
a.scratch = agg.AccumulatePartialOverlap(n.items[i], a.scratch, bounds)
continue
}
}
}
v, _ := a.Aggregator.Accumulate(n.items[i], a.scratch)
a.scratch = v
}

if !n.leaf {
// We will accumulate annotations from each child in the end-inclusive
// range [leftChild, rightChild].
leftChild, rightChild := leftItem, rightItem
// If the lower bound overlaps with the child at leftItem, there is no
// need to accumulate annotations from the child to its left.
if leftItem < int(n.count) && cmp(bounds.Start, n.items[leftItem].Smallest.UserKey) >= 0 {
leftChild++
}
// If the upper bound spans beyond the child at rightItem, we must also
// accumulate annotations from the child to its right.
if rightItem < int(n.count) && bounds.End.IsUpperBoundFor(cmp, n.items[rightItem].Largest.UserKey) {
rightChild++
}

for i := leftChild; i <= rightChild; i++ {
a.accumulateRangeAnnotation(
n.children[i],
cmp,
bounds,
// If this child is to the right of leftItem, then its entire
// subtree is within the lower bound.
fullyWithinLowerBound || i > leftItem,
// If this child is to the left of rightItem, then its entire
// subtree is within the upper bound.
fullyWithinUpperBound || i < rightItem,
)
}
}
}

// InvalidateAnnotation removes any existing cached annotations from this
// annotator from a node's subtree.
func (a *Annotator[T]) invalidateNodeAnnotation(n *node) {
Expand All @@ -142,8 +243,8 @@ func (a *Annotator[T]) LevelAnnotation(lm LevelMetadata) *T {
return v
}

// LevelAnnotation calculates the annotation defined by this Annotator for all
// files across the given levels. A pointer to the Annotator is used as the
// MultiLevelAnnotation calculates the annotation defined by this Annotator for
// all files across the given levels. A pointer to the Annotator is used as the
// key for pre-calculated values, so the same Annotator must be used to avoid
// duplicate computation. Annotation must not be called concurrently, and in
// practice this is achieved by requiring callers to hold DB.mu.
Expand All @@ -158,6 +259,42 @@ func (a *Annotator[T]) MultiLevelAnnotation(lms []LevelMetadata) *T {
return aggregated
}

// LevelRangeAnnotation calculates the annotation defined by this Annotator for
// the files within LevelMetadata which are within the range
// [lowerBound, upperBound). A pointer to the Annotator is used as the key for
// pre-calculated values, so the same Annotator must be used to avoid duplicate
// computation. Annotation must not be called concurrently, and in practice this
// is achieved by requiring callers to hold DB.mu.
func (a *Annotator[T]) LevelRangeAnnotation(lm LevelMetadata, bounds base.UserKeyBounds) *T {
if lm.Empty() {
return a.Aggregator.Zero(nil)
}

a.scratch = a.Aggregator.Zero(a.scratch)
a.accumulateRangeAnnotation(lm.tree.root, lm.tree.cmp, bounds, false, false)
return a.scratch
}

// VersionRangeAnnotation calculates the annotation defined by this Annotator
// for all files within the given Version which are within the range
// defined by bounds.
func (a *Annotator[T]) VersionRangeAnnotation(v *Version, bounds base.UserKeyBounds) *T {
accumulateSlice := func(ls LevelSlice) {
if ls.Empty() {
return
}
a.accumulateRangeAnnotation(ls.iter.r, v.cmp.Compare, bounds, false, false)
}
a.scratch = a.Aggregator.Zero(a.scratch)
for _, ls := range v.L0SublevelFiles {
accumulateSlice(ls)
}
for _, lm := range v.Levels[1:] {
accumulateSlice(lm.Slice())
}
return a.scratch
}

// InvalidateAnnotation clears any cached annotations defined by Annotator. A
// pointer to the Annotator is used as the key for pre-calculated values, so
// the same Annotator must be used to clear the appropriate cached annotation.
Expand All @@ -170,27 +307,47 @@ func (a *Annotator[T]) InvalidateLevelAnnotation(lm LevelMetadata) {
a.invalidateNodeAnnotation(lm.tree.root)
}

// sumAggregator defines an Aggregator which sums together a uint64 value
// SumAggregator defines an Aggregator which sums together a uint64 value
// across files.
type sumAggregator struct {
accumulateFunc func(f *FileMetadata) (v uint64, cacheOK bool)
type SumAggregator struct {
AccumulateFunc func(f *FileMetadata) (v uint64, cacheOK bool)
AccumulatePartialOverlapFunc func(f *FileMetadata, bounds base.UserKeyBounds) uint64
}

func (sa sumAggregator) Zero(dst *uint64) *uint64 {
// Zero implements AnnotationAggregator.Zero, returning a new uint64 set to 0.
func (sa SumAggregator) Zero(dst *uint64) *uint64 {
if dst == nil {
return new(uint64)
}
*dst = 0
return dst
}

func (sa sumAggregator) Accumulate(f *FileMetadata, dst *uint64) (v *uint64, cacheOK bool) {
accumulated, ok := sa.accumulateFunc(f)
// Accumulate implements AnnotationAggregator.Accumulate, accumulating a single
// file's uint64 value.
func (sa SumAggregator) Accumulate(f *FileMetadata, dst *uint64) (v *uint64, cacheOK bool) {
accumulated, ok := sa.AccumulateFunc(f)
*dst += accumulated
return dst, ok
}

func (sa sumAggregator) Merge(src *uint64, dst *uint64) *uint64 {
// AccumulatePartialOverlap implements
// PartialOverlapAnnotationAggregator.AccumulatePartialOverlap, accumulating a
// single file's uint64 value for a file which only partially overlaps with the
// range defined by bounds.
func (sa SumAggregator) AccumulatePartialOverlap(
f *FileMetadata, dst *uint64, bounds base.UserKeyBounds,
) *uint64 {
if sa.AccumulatePartialOverlapFunc == nil {
v, _ := sa.Accumulate(f, dst)
return v
}
*dst += sa.AccumulatePartialOverlapFunc(f, bounds)
return dst
}

// Merge implements AnnotationAggregator.Merge by summing two uint64 values.
func (sa SumAggregator) Merge(src *uint64, dst *uint64) *uint64 {
*dst += *src
return dst
}
Expand All @@ -200,12 +357,20 @@ func (sa sumAggregator) Merge(src *uint64, dst *uint64) *uint64 {
// files.
func SumAnnotator(accumulate func(f *FileMetadata) (v uint64, cacheOK bool)) *Annotator[uint64] {
return &Annotator[uint64]{
Aggregator: sumAggregator{
accumulateFunc: accumulate,
Aggregator: SumAggregator{
AccumulateFunc: accumulate,
},
}
}

// NumFilesAnnotator is an Annotator which computes an annotation value
// equal to the number of files included in the annotation. Particularly, it
// can be used to efficiently calculate the number of files in a given key
// range using range annotations.
var NumFilesAnnotator = SumAnnotator(func(f *FileMetadata) (uint64, bool) {
return 1, true
})

// PickFileAggregator implements the AnnotationAggregator interface. It defines
// an aggregator that picks a single file from a set of eligible files.
type PickFileAggregator struct {
Expand Down
Loading

0 comments on commit b10bfd5

Please sign in to comment.