Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

db: refactor disk usage estimate using range annotations #3848

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 43 additions & 65 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,14 @@ type DB struct {
// validating is set to true when validation is running.
validating bool
}

// annotators contains various instances of manifest.Annotator which
// should be protected from concurrent access.
annotators struct {
totalSize *manifest.Annotator[uint64]
remoteSize *manifest.Annotator[uint64]
externalSize *manifest.Annotator[uint64]
}
}

// Normally equal to time.Now() but may be overridden in tests.
Expand Down Expand Up @@ -2228,6 +2236,31 @@ func (d *DB) SSTables(opts ...SSTablesOption) ([][]SSTableInfo, error) {
return destLevels, nil
}

// makeFileSizeAnnotator returns an annotator that computes the total size of
// files that meet some criteria defined by filter.
func (d *DB) makeFileSizeAnnotator(filter func(f *fileMetadata) bool) *manifest.Annotator[uint64] {
return &manifest.Annotator[uint64]{
Aggregator: manifest.SumAggregator{
AccumulateFunc: func(f *fileMetadata) (uint64, bool) {
if filter(f) {
return f.Size, true
}
return 0, true
},
AccumulatePartialOverlapFunc: func(f *fileMetadata, bounds base.UserKeyBounds) uint64 {
if filter(f) {
size, err := d.tableCache.estimateSize(f, bounds.Start, bounds.End.Key)
if err != nil {
return 0
}
return size
}
return 0
},
},
}
}

// EstimateDiskUsage returns the estimated filesystem space used in bytes for
// storing the range `[start, end]`. The estimation is computed as follows:
//
Expand All @@ -2254,7 +2287,9 @@ func (d *DB) EstimateDiskUsageByBackingType(
if err := d.closed.Load(); err != nil {
panic(err)
}
if d.opts.Comparer.Compare(start, end) > 0 {

bounds := base.UserKeyBoundsInclusive(start, end)
if !bounds.Valid(d.cmp) {
return 0, 0, 0, errors.New("invalid key-range specified (start > end)")
}

Expand All @@ -2264,70 +2299,13 @@ func (d *DB) EstimateDiskUsageByBackingType(
readState := d.loadReadState()
defer readState.unref()

for level, files := range readState.current.Levels {
iter := files.Iter()
if level > 0 {
// We can only use `Overlaps` to restrict `files` at L1+ since at L0 it
// expands the range iteratively until it has found a set of files that
// do not overlap any other L0 files outside that set.
overlaps := readState.current.Overlaps(level, base.UserKeyBoundsInclusive(start, end))
iter = overlaps.Iter()
}
for file := iter.First(); file != nil; file = iter.Next() {
if d.opts.Comparer.Compare(start, file.Smallest.UserKey) <= 0 &&
d.opts.Comparer.Compare(file.Largest.UserKey, end) <= 0 {
// The range fully contains the file, so skip looking it up in
// table cache/looking at its indexes, and add the full file size.
meta, err := d.objProvider.Lookup(fileTypeTable, file.FileBacking.DiskFileNum)
if err != nil {
return 0, 0, 0, err
}
if meta.IsRemote() {
remoteSize += file.Size
if meta.Remote.CleanupMethod == objstorage.SharedNoCleanup {
externalSize += file.Size
}
}
totalSize += file.Size
} else if d.opts.Comparer.Compare(file.Smallest.UserKey, end) <= 0 &&
d.opts.Comparer.Compare(start, file.Largest.UserKey) <= 0 {
var size uint64
var err error
if file.Virtual {
err = d.tableCache.withVirtualReader(
file.VirtualMeta(),
func(r sstable.VirtualReader) (err error) {
size, err = r.EstimateDiskUsage(start, end)
return err
},
)
} else {
err = d.tableCache.withReader(
file.PhysicalMeta(),
func(r *sstable.Reader) (err error) {
size, err = r.EstimateDiskUsage(start, end)
return err
},
)
}
if err != nil {
return 0, 0, 0, err
}
meta, err := d.objProvider.Lookup(fileTypeTable, file.FileBacking.DiskFileNum)
if err != nil {
return 0, 0, 0, err
}
if meta.IsRemote() {
remoteSize += size
if meta.Remote.CleanupMethod == objstorage.SharedNoCleanup {
externalSize += size
}
}
totalSize += size
}
}
}
return totalSize, remoteSize, externalSize, nil
d.mu.Lock()
defer d.mu.Unlock()

totalSize = *d.mu.annotators.totalSize.VersionRangeAnnotation(readState.current, bounds)
remoteSize = *d.mu.annotators.remoteSize.VersionRangeAnnotation(readState.current, bounds)
externalSize = *d.mu.annotators.externalSize.VersionRangeAnnotation(readState.current, bounds)
return
}

func (d *DB) walPreallocateSize() int {
Expand Down
75 changes: 66 additions & 9 deletions internal/manifest/annotator.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ type AnnotationAggregator[T any] interface {
Merge(src *T, dst *T) *T
}

// A PartialOverlapAnnotationAggregator is an extension of AnnotationAggregator
// that allows for custom accumulation of range annotations for files that only
// partially overlap with the range.
type PartialOverlapAnnotationAggregator[T any] interface {
AnnotationAggregator[T]
AccumulatePartialOverlap(f *FileMetadata, dst *T, bounds base.UserKeyBounds) *T
}

type annotation struct {
// annotator is a pointer to the Annotator that computed this annotation.
// NB: This is untyped to allow AnnotationAggregator to use Go generics,
Expand Down Expand Up @@ -165,6 +173,15 @@ func (a *Annotator[T]) accumulateRangeAnnotation(

// Accumulate annotations from every item that overlaps the bounds.
for i := leftItem; i < rightItem; i++ {
if i == leftItem || i == rightItem-1 {
if agg, ok := a.Aggregator.(PartialOverlapAnnotationAggregator[T]); ok {
fb := n.items[i].UserKeyBounds()
if cmp(bounds.Start, fb.Start) > 0 || bounds.End.CompareUpperBounds(cmp, fb.End) < 0 {
a.scratch = agg.AccumulatePartialOverlap(n.items[i], a.scratch, bounds)
continue
}
}
}
v, _ := a.Aggregator.Accumulate(n.items[i], a.scratch)
a.scratch = v
}
Expand Down Expand Up @@ -258,6 +275,26 @@ func (a *Annotator[T]) LevelRangeAnnotation(lm LevelMetadata, bounds base.UserKe
return a.scratch
}

// VersionRangeAnnotation calculates the annotation defined by this Annotator
// for all files within the given Version which are within the range
// defined by bounds.
func (a *Annotator[T]) VersionRangeAnnotation(v *Version, bounds base.UserKeyBounds) *T {
accumulateSlice := func(ls LevelSlice) {
if ls.Empty() {
return
}
a.accumulateRangeAnnotation(ls.iter.r, v.cmp.Compare, bounds, false, false)
}
a.scratch = a.Aggregator.Zero(a.scratch)
for _, ls := range v.L0SublevelFiles {
accumulateSlice(ls)
}
for _, lm := range v.Levels[1:] {
accumulateSlice(lm.Slice())
}
return a.scratch
}

// InvalidateAnnotation clears any cached annotations defined by Annotator. A
// pointer to the Annotator is used as the key for pre-calculated values, so
// the same Annotator must be used to clear the appropriate cached annotation.
Expand All @@ -270,27 +307,47 @@ func (a *Annotator[T]) InvalidateLevelAnnotation(lm LevelMetadata) {
a.invalidateNodeAnnotation(lm.tree.root)
}

// sumAggregator defines an Aggregator which sums together a uint64 value
// SumAggregator defines an Aggregator which sums together a uint64 value
// across files.
type sumAggregator struct {
accumulateFunc func(f *FileMetadata) (v uint64, cacheOK bool)
type SumAggregator struct {
AccumulateFunc func(f *FileMetadata) (v uint64, cacheOK bool)
AccumulatePartialOverlapFunc func(f *FileMetadata, bounds base.UserKeyBounds) uint64
}

func (sa sumAggregator) Zero(dst *uint64) *uint64 {
// Zero implements AnnotationAggregator.Zero, returning a new uint64 set to 0.
func (sa SumAggregator) Zero(dst *uint64) *uint64 {
if dst == nil {
return new(uint64)
}
*dst = 0
return dst
}

func (sa sumAggregator) Accumulate(f *FileMetadata, dst *uint64) (v *uint64, cacheOK bool) {
accumulated, ok := sa.accumulateFunc(f)
// Accumulate implements AnnotationAggregator.Accumulate, accumulating a single
// file's uint64 value.
func (sa SumAggregator) Accumulate(f *FileMetadata, dst *uint64) (v *uint64, cacheOK bool) {
accumulated, ok := sa.AccumulateFunc(f)
*dst += accumulated
return dst, ok
}

func (sa sumAggregator) Merge(src *uint64, dst *uint64) *uint64 {
// AccumulatePartialOverlap implements
// PartialOverlapAnnotationAggregator.AccumulatePartialOverlap, accumulating a
// single file's uint64 value for a file which only partially overlaps with the
// range defined by bounds.
func (sa SumAggregator) AccumulatePartialOverlap(
f *FileMetadata, dst *uint64, bounds base.UserKeyBounds,
) *uint64 {
if sa.AccumulatePartialOverlapFunc == nil {
v, _ := sa.Accumulate(f, dst)
return v
}
*dst += sa.AccumulatePartialOverlapFunc(f, bounds)
return dst
}

// Merge implements AnnotationAggregator.Merge by summing two uint64 values.
func (sa SumAggregator) Merge(src *uint64, dst *uint64) *uint64 {
*dst += *src
return dst
}
Expand All @@ -300,8 +357,8 @@ func (sa sumAggregator) Merge(src *uint64, dst *uint64) *uint64 {
// files.
func SumAnnotator(accumulate func(f *FileMetadata) (v uint64, cacheOK bool)) *Annotator[uint64] {
return &Annotator[uint64]{
Aggregator: sumAggregator{
accumulateFunc: accumulate,
Aggregator: SumAggregator{
AccumulateFunc: accumulate,
},
}
}
Expand Down
18 changes: 18 additions & 0 deletions open.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,24 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
d.newIters = d.tableCache.newIters
d.tableNewRangeKeyIter = tableNewRangeKeyIter(d.newIters)

d.mu.annotators.totalSize = d.makeFileSizeAnnotator(func(f *manifest.FileMetadata) bool {
return true
})
d.mu.annotators.remoteSize = d.makeFileSizeAnnotator(func(f *manifest.FileMetadata) bool {
meta, err := d.objProvider.Lookup(fileTypeTable, f.FileBacking.DiskFileNum)
if err != nil {
return false
}
return meta.IsRemote()
})
d.mu.annotators.externalSize = d.makeFileSizeAnnotator(func(f *manifest.FileMetadata) bool {
meta, err := d.objProvider.Lookup(fileTypeTable, f.FileBacking.DiskFileNum)
if err != nil {
return false
}
return meta.IsRemote() && meta.Remote.CleanupMethod == objstorage.SharedNoCleanup
})

var previousOptionsFileNum base.DiskFileNum
var previousOptionsFilename string
for _, filename := range ls {
Expand Down
26 changes: 5 additions & 21 deletions table_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,27 +210,11 @@ func (c *tableCacheContainer) metrics() (CacheMetrics, FilterMetrics) {
func (c *tableCacheContainer) estimateSize(
meta *fileMetadata, lower, upper []byte,
) (size uint64, err error) {
if meta.Virtual {
err = c.withVirtualReader(
meta.VirtualMeta(),
func(r sstable.VirtualReader) (err error) {
size, err = r.EstimateDiskUsage(lower, upper)
return err
},
)
} else {
err = c.withReader(
meta.PhysicalMeta(),
func(r *sstable.Reader) (err error) {
size, err = r.EstimateDiskUsage(lower, upper)
return err
},
)
}
if err != nil {
return 0, err
}
return size, nil
c.withCommonReader(meta, func(cr sstable.CommonReader) error {
size, err = cr.EstimateDiskUsage(lower, upper)
return err
})
return size, err
}

// createCommonReader creates a Reader for this file.
Expand Down
Loading