diff --git a/mutable_tree.go b/mutable_tree.go index f374a0e..7301f37 100644 --- a/mutable_tree.go +++ b/mutable_tree.go @@ -37,6 +37,8 @@ type MutableTree struct { unsavedFastNodeRemovals map[string]interface{} // FastNodes that have not yet been removed from disk ndb *nodeDB skipFastStorageUpgrade bool // If true, the tree will work like no fast storage and always not upgrade fast storage + versionsToKeep int64 + orphandb *orphanDB mtx *sync.Mutex } @@ -60,6 +62,8 @@ func NewMutableTreeWithOpts(db dbm.DB, cacheSize int, opts *Options, skipFastSto unsavedFastNodeRemovals: make(map[string]interface{}), ndb: ndb, skipFastStorageUpgrade: skipFastStorageUpgrade, + versionsToKeep: opts.VersionsToKeep, + orphandb: NewOrphanDB(opts), mtx: &sync.Mutex{}, }, nil } @@ -951,12 +955,38 @@ func (tree *MutableTree) saveFastNodeVersion() error { return tree.ndb.setFastStorageVersionToBatch() } +func (tree *MutableTree) handleOrphans(version int64) error { + if tree.versionsToKeep == 0 { + return tree.ndb.SaveOrphans(version, tree.orphans) + } + + if tree.versionsToKeep == 1 { + for orphan := range tree.orphans { + if err := tree.ndb.deleteOrphanedData([]byte(orphan)); err != nil { + return err + } + } + return nil + } + + if err := tree.orphandb.SaveOrphans(version, tree.orphans); err != nil { + return err + } + oldOrphans := tree.orphandb.GetOrphans(version - tree.versionsToKeep + 1) + for orphan := range oldOrphans { + if err := tree.ndb.deleteOrphanedData([]byte(orphan)); err != nil { + return err + } + } + return tree.orphandb.DeleteOrphans(version - tree.versionsToKeep + 1) +} + func (tree *MutableTree) commitVersion(version int64, silentSaveRootError bool) (int64, error) { if tree.root == nil { // There can still be orphans, for example if the root is the node being // removed. logger.Debug("SAVE EMPTY TREE %v\n", version) - if err := tree.ndb.SaveOrphans(version, tree.orphans); err != nil { + if err := tree.handleOrphans(version); err != nil { return 0, err } if err := tree.ndb.SaveEmptyRoot(version); !silentSaveRootError && err != nil { @@ -967,7 +997,7 @@ func (tree *MutableTree) commitVersion(version int64, silentSaveRootError bool) if _, err := tree.ndb.SaveBranch(tree.root); err != nil { return 0, err } - if err := tree.ndb.SaveOrphans(version, tree.orphans); err != nil { + if err := tree.handleOrphans(version); err != nil { return 0, err } if err := tree.ndb.SaveRoot(tree.root, version); !silentSaveRootError && err != nil { @@ -1070,10 +1100,7 @@ func (tree *MutableTree) SetInitialVersion(version uint64) { func (tree *MutableTree) DeleteVersions(versions ...int64) error { logger.Debug("DELETING VERSIONS: %v\n", versions) - if tree.ndb.ShouldNotUseVersion() { - // no need to delete versions since there is no version to be - // deleted except the current one, which shouldn't be deleted - // in any circumstance + if tree.versionsToKeep > 0 { return nil } diff --git a/nodedb.go b/nodedb.go index 49bc846..18bd44a 100644 --- a/nodedb.go +++ b/nodedb.go @@ -634,18 +634,6 @@ func (ndb *nodeDB) SaveOrphans(version int64, orphans map[string]int64) error { ndb.mtx.Lock() defer ndb.mtx.Unlock() - // instead of saving orphan metadata and later read orphan metadata->delete - // orphan data->delete orphan metadata, we directly delete orphan data here - // without doing anything for orphan metadata, if versioning is not needed. - if ndb.ShouldNotUseVersion() { - for orphan := range orphans { - if err := ndb.deleteOrphanedData([]byte(orphan)); err != nil { - return err - } - } - return nil - } - toVersion, err := ndb.getPreviousVersion(version) if err != nil { return err @@ -1073,10 +1061,6 @@ func (ndb *nodeDB) traverseNodes(fn func(hash []byte, node *Node) error) error { return nil } -func (ndb *nodeDB) ShouldNotUseVersion() bool { - return ndb.opts.NoVersioning -} - func (ndb *nodeDB) String() (string, error) { buf := bufPool.Get().(*bytes.Buffer) defer bufPool.Put(buf) diff --git a/options.go b/options.go index 58dfcab..a1ad60d 100644 --- a/options.go +++ b/options.go @@ -82,12 +82,16 @@ type Options struct { // When Stat is not nil, statistical logic needs to be executed Stat *Statistics - // When set to true, the DB will only keep the most recent version and immediately delete - // obsolete data upon new data's commit - NoVersioning bool + VersionsToKeep int64 + + NumOrphansPerFile int + + OrphanDirectory string } // DefaultOptions returns the default options for IAVL. func DefaultOptions() Options { - return Options{} + return Options{ + NumOrphansPerFile: 100000, + } } diff --git a/orphandb.go b/orphandb.go new file mode 100644 index 0000000..fb714e1 --- /dev/null +++ b/orphandb.go @@ -0,0 +1,75 @@ +package iavl + +import ( + "fmt" + "io/fs" + "io/ioutil" + "os" + "path" + "strings" +) + +type orphanDB struct { + cache map[int64]map[string]int64 // key: version, value: orphans + directory string + numOrphansPerFile int +} + +func NewOrphanDB(opts *Options) *orphanDB { + return &orphanDB{ + cache: map[int64]map[string]int64{}, + directory: opts.OrphanDirectory, + numOrphansPerFile: opts.NumOrphansPerFile, + } +} + +func (o *orphanDB) SaveOrphans(version int64, orphans map[string]int64) error { + o.cache[version] = orphans + chunks := [][]string{{}} + for orphan := range orphans { + if len(chunks[len(chunks)-1]) == o.numOrphansPerFile { + chunks = append(chunks, []string{}) + } + chunks[len(chunks)-1] = append(chunks[len(chunks)-1], orphan) + } + dir := path.Join(o.directory, fmt.Sprintf("%d", version)) + os.RemoveAll(dir) + os.MkdirAll(dir, fs.ModePerm) + for i, chunk := range chunks { + f, err := os.Create(path.Join(dir, fmt.Sprintf("%d", i))) + if err != nil { + return err + } + f.WriteString(strings.Join(chunk, "\n")) + f.Close() + } + return nil +} + +func (o *orphanDB) GetOrphans(version int64) map[string]int64 { + if _, ok := o.cache[version]; !ok { + o.cache[version] = map[string]int64{} + dir := path.Join(o.directory, fmt.Sprintf("%d", version)) + files, err := ioutil.ReadDir(dir) + if err != nil { + // no orphans found + return o.cache[version] + } + for _, file := range files { + content, err := ioutil.ReadFile(path.Join(dir, file.Name())) + if err != nil { + return o.cache[version] + } + for _, orphan := range strings.Split(string(content), "\n") { + o.cache[version][orphan] = version + } + } + } + return o.cache[version] +} + +func (o *orphanDB) DeleteOrphans(version int64) error { + delete(o.cache, version) + dir := path.Join(o.directory, fmt.Sprintf("%d", version)) + return os.RemoveAll(dir) +}