Skip to content

Commit

Permalink
no orphan multi version
Browse files Browse the repository at this point in the history
  • Loading branch information
codchen committed May 30, 2023
1 parent 2491243 commit d252e54
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 26 deletions.
39 changes: 33 additions & 6 deletions mutable_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type MutableTree struct {
unsavedFastNodeRemovals map[string]interface{} // FastNodes that have not yet been removed from disk
ndb *nodeDB
skipFastStorageUpgrade bool // If true, the tree will work like no fast storage and always not upgrade fast storage
versionsToKeep int64
orphandb *orphanDB
mtx *sync.Mutex
}

Expand All @@ -60,6 +62,8 @@ func NewMutableTreeWithOpts(db dbm.DB, cacheSize int, opts *Options, skipFastSto
unsavedFastNodeRemovals: make(map[string]interface{}),
ndb: ndb,
skipFastStorageUpgrade: skipFastStorageUpgrade,
versionsToKeep: opts.VersionsToKeep,
orphandb: NewOrphanDB(opts),
mtx: &sync.Mutex{},
}, nil
}
Expand Down Expand Up @@ -951,12 +955,38 @@ func (tree *MutableTree) saveFastNodeVersion() error {
return tree.ndb.setFastStorageVersionToBatch()
}

func (tree *MutableTree) handleOrphans(version int64) error {
if tree.versionsToKeep == 0 {
return tree.ndb.SaveOrphans(version, tree.orphans)
}

if tree.versionsToKeep == 1 {
for orphan := range tree.orphans {
if err := tree.ndb.deleteOrphanedData([]byte(orphan)); err != nil {
return err
}
}
return nil
}

if err := tree.orphandb.SaveOrphans(version, tree.orphans); err != nil {
return err
}
oldOrphans := tree.orphandb.GetOrphans(version - tree.versionsToKeep + 1)
for orphan := range oldOrphans {
if err := tree.ndb.deleteOrphanedData([]byte(orphan)); err != nil {
return err
}
}
return tree.orphandb.DeleteOrphans(version - tree.versionsToKeep + 1)
}

func (tree *MutableTree) commitVersion(version int64, silentSaveRootError bool) (int64, error) {
if tree.root == nil {
// There can still be orphans, for example if the root is the node being
// removed.
logger.Debug("SAVE EMPTY TREE %v\n", version)
if err := tree.ndb.SaveOrphans(version, tree.orphans); err != nil {
if err := tree.handleOrphans(version); err != nil {
return 0, err
}
if err := tree.ndb.SaveEmptyRoot(version); !silentSaveRootError && err != nil {
Expand All @@ -967,7 +997,7 @@ func (tree *MutableTree) commitVersion(version int64, silentSaveRootError bool)
if _, err := tree.ndb.SaveBranch(tree.root); err != nil {
return 0, err
}
if err := tree.ndb.SaveOrphans(version, tree.orphans); err != nil {
if err := tree.handleOrphans(version); err != nil {
return 0, err
}
if err := tree.ndb.SaveRoot(tree.root, version); !silentSaveRootError && err != nil {
Expand Down Expand Up @@ -1070,10 +1100,7 @@ func (tree *MutableTree) SetInitialVersion(version uint64) {
func (tree *MutableTree) DeleteVersions(versions ...int64) error {
logger.Debug("DELETING VERSIONS: %v\n", versions)

if tree.ndb.ShouldNotUseVersion() {
// no need to delete versions since there is no version to be
// deleted except the current one, which shouldn't be deleted
// in any circumstance
if tree.versionsToKeep > 0 {
return nil
}

Expand Down
16 changes: 0 additions & 16 deletions nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,18 +634,6 @@ func (ndb *nodeDB) SaveOrphans(version int64, orphans map[string]int64) error {
ndb.mtx.Lock()
defer ndb.mtx.Unlock()

// instead of saving orphan metadata and later read orphan metadata->delete
// orphan data->delete orphan metadata, we directly delete orphan data here
// without doing anything for orphan metadata, if versioning is not needed.
if ndb.ShouldNotUseVersion() {
for orphan := range orphans {
if err := ndb.deleteOrphanedData([]byte(orphan)); err != nil {
return err
}
}
return nil
}

toVersion, err := ndb.getPreviousVersion(version)
if err != nil {
return err
Expand Down Expand Up @@ -1073,10 +1061,6 @@ func (ndb *nodeDB) traverseNodes(fn func(hash []byte, node *Node) error) error {
return nil
}

func (ndb *nodeDB) ShouldNotUseVersion() bool {
return ndb.opts.NoVersioning
}

func (ndb *nodeDB) String() (string, error) {
buf := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(buf)
Expand Down
12 changes: 8 additions & 4 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,16 @@ type Options struct {
// When Stat is not nil, statistical logic needs to be executed
Stat *Statistics

// When set to true, the DB will only keep the most recent version and immediately delete
// obsolete data upon new data's commit
NoVersioning bool
VersionsToKeep int64

NumOrphansPerFile int

OrphanDirectory string
}

// DefaultOptions returns the default options for IAVL.
func DefaultOptions() Options {
return Options{}
return Options{
NumOrphansPerFile: 100000,
}
}
75 changes: 75 additions & 0 deletions orphandb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package iavl

import (
"fmt"
"io/fs"
"io/ioutil"
"os"
"path"
"strings"
)

type orphanDB struct {
cache map[int64]map[string]int64 // key: version, value: orphans
directory string
numOrphansPerFile int
}

func NewOrphanDB(opts *Options) *orphanDB {
return &orphanDB{
cache: map[int64]map[string]int64{},
directory: opts.OrphanDirectory,
numOrphansPerFile: opts.NumOrphansPerFile,
}
}

func (o *orphanDB) SaveOrphans(version int64, orphans map[string]int64) error {
o.cache[version] = orphans
chunks := [][]string{{}}
for orphan := range orphans {
if len(chunks[len(chunks)-1]) == o.numOrphansPerFile {
chunks = append(chunks, []string{})
}
chunks[len(chunks)-1] = append(chunks[len(chunks)-1], orphan)
}
dir := path.Join(o.directory, fmt.Sprintf("%d", version))
os.RemoveAll(dir)
os.MkdirAll(dir, fs.ModePerm)
for i, chunk := range chunks {
f, err := os.Create(path.Join(dir, fmt.Sprintf("%d", i)))
if err != nil {
return err
}
f.WriteString(strings.Join(chunk, "\n"))
f.Close()
}
return nil
}

func (o *orphanDB) GetOrphans(version int64) map[string]int64 {
if _, ok := o.cache[version]; !ok {
o.cache[version] = map[string]int64{}
dir := path.Join(o.directory, fmt.Sprintf("%d", version))
files, err := ioutil.ReadDir(dir)
if err != nil {
// no orphans found
return o.cache[version]
}
for _, file := range files {
content, err := ioutil.ReadFile(path.Join(dir, file.Name()))
if err != nil {
return o.cache[version]
}
for _, orphan := range strings.Split(string(content), "\n") {
o.cache[version][orphan] = version
}
}
}
return o.cache[version]
}

func (o *orphanDB) DeleteOrphans(version int64) error {
delete(o.cache, version)
dir := path.Join(o.directory, fmt.Sprintf("%d", version))
return os.RemoveAll(dir)
}

0 comments on commit d252e54

Please sign in to comment.