Skip to content

Commit

Permalink
Proper locking
Browse files Browse the repository at this point in the history
  • Loading branch information
codchen committed Mar 1, 2023
1 parent 78df9f1 commit 00d605d
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 24 deletions.
7 changes: 4 additions & 3 deletions immutable_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package iavl
import (
"fmt"
"strings"
"sync"

dbm "github.com/tendermint/tm-db"
)
Expand Down Expand Up @@ -230,7 +231,7 @@ func (t *ImmutableTree) Iterate(fn func(key []byte, value []byte) bool) (bool, e
return false, nil
}

itr, err := t.Iterator(nil, nil, true)
itr, err := t.Iterator(nil, nil, true, nil)
defer itr.Close()
if err != nil {
return false, err
Expand All @@ -245,7 +246,7 @@ func (t *ImmutableTree) Iterate(fn func(key []byte, value []byte) bool) (bool, e
}

// Iterator returns an iterator over the immutable tree.
func (t *ImmutableTree) Iterator(start, end []byte, ascending bool) (dbm.Iterator, error) {
func (t *ImmutableTree) Iterator(start, end []byte, ascending bool, mtx *sync.RWMutex) (dbm.Iterator, error) {
if !t.skipFastStorageUpgrade {
isFastCacheEnabled, err := t.IsFastCacheEnabled()
if err != nil {
Expand All @@ -256,7 +257,7 @@ func (t *ImmutableTree) Iterator(start, end []byte, ascending bool) (dbm.Iterato
return NewFastIterator(start, end, ascending, t.ndb), nil
}
}
return NewIterator(start, end, ascending, t), nil
return NewIterator(start, end, ascending, t, mtx), nil
}

// IterateRange makes a callback for all nodes with key between start and end non-inclusive.
Expand Down
37 changes: 25 additions & 12 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package iavl
import (
"bytes"
"errors"
"sync"

dbm "github.com/tendermint/tm-db"
)
Expand Down Expand Up @@ -76,19 +77,19 @@ func (nodes *delayedNodes) length() int {
// 1. If it is not an delayed node (node.delayed == false) it immediately returns it.
//
// A. If the `node` is a branch node:
// 1. If the traversal is postorder, then append the current node to the t.delayedNodes,
// with `delayed` set to false. This makes the current node returned *after* all the children
// are traversed, without being expanded.
// 2. Append the traversable children nodes into the `delayedNodes`, with `delayed` set to true. This
// makes the children nodes to be traversed, and expanded with their respective children.
// 3. If the traversal is preorder, (with the children to be traversed already pushed to the
// `delayedNodes`), returns the current node.
// 4. Call `traversal.next()` to further traverse through the `delayedNodes`.
// 1. If the traversal is postorder, then append the current node to the t.delayedNodes,
// with `delayed` set to false. This makes the current node returned *after* all the children
// are traversed, without being expanded.
// 2. Append the traversable children nodes into the `delayedNodes`, with `delayed` set to true. This
// makes the children nodes to be traversed, and expanded with their respective children.
// 3. If the traversal is preorder, (with the children to be traversed already pushed to the
// `delayedNodes`), returns the current node.
// 4. Call `traversal.next()` to further traverse through the `delayedNodes`.
//
// B. If the `node` is a leaf node, it will be returned without expand, by the following process:
// 1. If the traversal is postorder, the current node will be append to the `delayedNodes` with `delayed`
// set to false, and immediately returned at the subsequent call of `traversal.next()` at the last line.
// 2. If the traversal is preorder, the current node will be returned.
// 1. If the traversal is postorder, the current node will be append to the `delayedNodes` with `delayed`
// set to false, and immediately returned at the subsequent call of `traversal.next()` at the last line.
// 2. If the traversal is preorder, the current node will be returned.
func (t *traversal) next() (*Node, error) {
// End of traversal.
if t.delayedNodes.length() == 0 {
Expand Down Expand Up @@ -179,18 +180,24 @@ type Iterator struct {
err error

t *traversal

mtx *sync.RWMutex
}

var _ dbm.Iterator = (*Iterator)(nil)

// Returns a new iterator over the immutable tree. If the tree is nil, the iterator will be invalid.
func NewIterator(start, end []byte, ascending bool, tree *ImmutableTree) dbm.Iterator {
func NewIterator(start, end []byte, ascending bool, tree *ImmutableTree, mtx *sync.RWMutex) dbm.Iterator {
iter := &Iterator{
start: start,
end: end,
mtx: mtx,
}

if tree == nil {
if mtx != nil {
mtx.Unlock()
}
iter.err = errIteratorNilTreeGiven
} else {
iter.valid = true
Expand Down Expand Up @@ -232,6 +239,9 @@ func (iter *Iterator) Next() {
if node == nil || err != nil {
iter.t = nil
iter.valid = false
if iter.mtx != nil {
iter.mtx.Unlock()
}
return
}

Expand All @@ -247,6 +257,9 @@ func (iter *Iterator) Next() {
func (iter *Iterator) Close() error {
iter.t = nil
iter.valid = false
if iter.mtx != nil {
iter.mtx.Unlock()
}
return iter.err
}

Expand Down
36 changes: 27 additions & 9 deletions mutable_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type MutableTree struct {
ndb *nodeDB
skipFastStorageUpgrade bool // If true, the tree will work like no fast storage and always not upgrade fast storage

mtx sync.Mutex
mtx sync.RWMutex
}

// NewMutableTree returns a new tree with the specified cache size and datastore.
Expand Down Expand Up @@ -67,6 +67,8 @@ func NewMutableTreeWithOpts(db dbm.DB, cacheSize int, opts *Options, skipFastSto
// IsEmpty returns whether or not the tree has any keys. Only trees that are
// not empty can be saved.
func (tree *MutableTree) IsEmpty() bool {
tree.mtx.RLock()
defer tree.mtx.RUnlock()
return tree.ImmutableTree.Size() == 0
}

Expand Down Expand Up @@ -130,6 +132,8 @@ func (tree *MutableTree) prepareOrphansSlice() []*Node {
// to slices stored within IAVL. It returns true when an existing value was
// updated, while false means it was a new key.
func (tree *MutableTree) Set(key, value []byte) (updated bool, err error) {
tree.mtx.Lock()
defer tree.mtx.Unlock()
var orphaned []*Node
orphaned, updated, err = tree.set(key, value)
if err != nil {
Expand All @@ -145,6 +149,8 @@ func (tree *MutableTree) Set(key, value []byte) (updated bool, err error) {
// Get returns the value of the specified key if it exists, or nil otherwise.
// The returned value must not be modified, since it may point to data stored within IAVL.
func (tree *MutableTree) Get(key []byte) ([]byte, error) {
tree.mtx.Lock()
defer tree.mtx.Unlock()
if tree.root == nil {
return nil, nil
}
Expand Down Expand Up @@ -177,6 +183,9 @@ func (tree *MutableTree) Import(version int64) (*Importer, error) {
// Iterate iterates over all keys of the tree. The keys and values must not be modified,
// since they may point to data stored within IAVL. Returns true if stopped by callnack, false otherwise
func (tree *MutableTree) Iterate(fn func(key []byte, value []byte) bool) (stopped bool, err error) {
tree.mtx.Lock()
defer tree.mtx.Unlock()

if tree.root == nil {
return false, nil
}
Expand Down Expand Up @@ -217,7 +226,7 @@ func (tree *MutableTree) Iterator(start, end []byte, ascending bool) (dbm.Iterat
}
}

return tree.ImmutableTree.Iterator(start, end, ascending)
return tree.ImmutableTree.Iterator(start, end, ascending, &tree.mtx)
}

func (tree *MutableTree) set(key []byte, value []byte) (orphans []*Node, updated bool, err error) {
Expand Down Expand Up @@ -319,6 +328,8 @@ func (tree *MutableTree) recursiveSet(node *Node, key []byte, value []byte, orph
// Remove removes a key from the working tree. The given key byte slice should not be modified
// after this call, since it may point to data stored inside IAVL.
func (tree *MutableTree) Remove(key []byte) ([]byte, bool, error) {
tree.mtx.Lock()
defer tree.mtx.Unlock()
val, orphaned, removed, err := tree.remove(key)
if err != nil {
return nil, false, err
Expand Down Expand Up @@ -708,7 +719,7 @@ func (tree *MutableTree) enableFastStorageAndCommitLocked() error {
func (tree *MutableTree) enableFastStorageAndCommit() error {
var err error

itr := NewIterator(nil, nil, true, tree.ImmutableTree)
itr := NewIterator(nil, nil, true, tree.ImmutableTree, nil)
defer itr.Close()
var upgradedFastNodes uint64
for ; itr.Valid(); itr.Next() {
Expand Down Expand Up @@ -873,6 +884,8 @@ func (tree *MutableTree) SaveVersion() ([]byte, int64, error) {
}

if tree.VersionExists(version) {
tree.mtx.Lock()
defer tree.mtx.Unlock()
// If the version already exists, return an error as we're attempting to overwrite.
// However, the same hash means idempotent (i.e. no-op).
existingHash, err := tree.ndb.getRoot(version)
Expand Down Expand Up @@ -902,12 +915,13 @@ func (tree *MutableTree) SaveVersion() ([]byte, int64, error) {
return nil, version, fmt.Errorf("version %d was already saved to different hash %X (existing hash %X)", version, newHash, existingHash)
}

tree.mtx.Lock()
defer tree.mtx.Unlock()

if v, err := tree.commitVersion(version, false); err != nil {
return nil, v, err
}

tree.mtx.Lock()
defer tree.mtx.Unlock()
tree.version = version
tree.versions[version] = true

Expand Down Expand Up @@ -1055,6 +1069,9 @@ func (tree *MutableTree) SetInitialVersion(version uint64) {
// DeleteVersions deletes a series of versions from the MutableTree.
// Deprecated: please use DeleteVersionsRange instead.
func (tree *MutableTree) DeleteVersions(versions ...int64) error {
tree.mtx.Lock()
defer tree.mtx.Unlock()

logger.Debug("DELETING VERSIONS: %v\n", versions)

if len(versions) == 0 {
Expand Down Expand Up @@ -1088,6 +1105,9 @@ func (tree *MutableTree) DeleteVersions(versions ...int64) error {
// An error is returned if any single version has active readers.
// All writes happen in a single batch with a single commit.
func (tree *MutableTree) DeleteVersionsRange(fromVersion, toVersion int64) error {
tree.mtx.Lock()
defer tree.mtx.Unlock()

if err := tree.ndb.DeleteVersionsRange(fromVersion, toVersion); err != nil {
return err
}
Expand All @@ -1096,8 +1116,6 @@ func (tree *MutableTree) DeleteVersionsRange(fromVersion, toVersion int64) error
return err
}

tree.mtx.Lock()
defer tree.mtx.Unlock()
for version := fromVersion; version < toVersion; version++ {
delete(tree.versions, version)
}
Expand All @@ -1109,6 +1127,8 @@ func (tree *MutableTree) DeleteVersionsRange(fromVersion, toVersion int64) error
// longer be accessed.
func (tree *MutableTree) DeleteVersion(version int64) error {
logger.Debug("DELETE VERSION: %d\n", version)
tree.mtx.Lock()
defer tree.mtx.Unlock()

if err := tree.deleteVersion(version); err != nil {
return err
Expand All @@ -1118,8 +1138,6 @@ func (tree *MutableTree) DeleteVersion(version int64) error {
return err
}

tree.mtx.Lock()
defer tree.mtx.Unlock()
delete(tree.versions, version)
return nil
}
Expand Down

0 comments on commit 00d605d

Please sign in to comment.