Skip to content

Commit

Permalink
Merge pull request #8 from octu0/v1.3.4
Browse files Browse the repository at this point in the history
v1.3.4
  • Loading branch information
octu0 authored Jun 27, 2022
2 parents 09669e1 + 5e247da commit 693f0e3
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 37 deletions.
80 changes: 48 additions & 32 deletions bitcask.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,12 @@ func (b *Bitcask) putAndIndex(key []byte, value io.Reader, expiry time.Time) err
func (b *Bitcask) putAndIndexLocked(key []byte, value io.Reader, expiry time.Time) error {
index, size, err := b.put(key, value, expiry)
if err != nil {
return err
return errors.WithStack(err)
}

if b.opt.Sync {
if err := b.curr.Sync(); err != nil {
return err
return errors.WithStack(err)
}
}

Expand Down Expand Up @@ -260,8 +260,9 @@ func (b *Bitcask) Delete(key []byte) error {
func (b *Bitcask) delete(key []byte) error {
_, _, err := b.put(key, nil, time.Time{})
if err != nil {
return err
return errors.WithStack(err)
}

v, found := b.trie.Search(key)
if found {
f := v.(indexer.Filer)
Expand All @@ -282,9 +283,10 @@ func (b *Bitcask) delete(key []byte) error {
// deleted from the database.
// If the function returns an error on any key, no further keys are processed, no
// keys are deleted, and the first error is returned.
func (b *Bitcask) Sift(f func(key []byte) (bool, error)) (err error) {
func (b *Bitcask) Sift(f func(key []byte) (bool, error)) error {
keysToDelete := art.New()

var lastErr error
b.mu.RLock()
b.trie.ForEach(func(node art.Node) bool {
nodeKey := node.Key()
Expand All @@ -293,55 +295,65 @@ func (b *Bitcask) Sift(f func(key []byte) (bool, error)) (err error) {
return true
}
var shouldDelete bool
if shouldDelete, err = f(nodeKey); err != nil {
shouldDelete, err := f(nodeKey)
if err != nil {
lastErr = errors.WithStack(err)
return false
} else if shouldDelete {
}
if shouldDelete {
keysToDelete.Insert(nodeKey, true)
}
return true
})
b.mu.RUnlock()
if err != nil {
return
if lastErr != nil {
return errors.WithStack(lastErr)
}

b.mu.Lock()
defer b.mu.Unlock()

keysToDelete.ForEach(func(node art.Node) (cont bool) {
b.delete(node.Key())
return true
})
return
return nil
}

// DeleteAll deletes all the keys. If an I/O error occurs the error is returned.
func (b *Bitcask) DeleteAll() (err error) {
func (b *Bitcask) DeleteAll() error {
b.mu.Lock()
defer b.mu.Unlock()

var lastErr error
b.trie.ForEach(func(node art.Node) bool {
nodeKey := node.Key()
_, _, err = b.put(nodeKey, nil, time.Time{})
if err != nil {
if _, _, err := b.put(nodeKey, nil, time.Time{}); err != nil {
lastErr = errors.WithStack(err)
return false
}

filer, _ := b.trie.Search(nodeKey)
b.metadata.ReclaimableSpace += filer.(indexer.Filer).Size
return true
})
b.trie = art.New()
b.ttlIndex = art.New()

return
if lastErr != nil {
return errors.WithStack(lastErr)
}
return nil
}

// Scan performs a prefix scan of keys matching the given prefix and calling
// the function `f` with the keys found. If the function returns an error
// no further keys are processed and the first error is returned.
func (b *Bitcask) Scan(prefix []byte, f func(key []byte) error) (err error) {
func (b *Bitcask) Scan(prefix []byte, f func(key []byte) error) error {
b.mu.RLock()
defer b.mu.RUnlock()

var lastErr error
b.trie.ForEachPrefix(prefix, func(node art.Node) bool {
nodeKey := node.Key()
// Skip the root node
Expand All @@ -354,12 +366,16 @@ func (b *Bitcask) Scan(prefix []byte, f func(key []byte) error) (err error) {
return true
}

if err = f(nodeKey); err != nil {
if err := f(nodeKey); err != nil {
lastErr = errors.WithStack(err)
return false
}
return true
})
return
if lastErr != nil {
return errors.WithStack(lastErr)
}
return nil
}

// SiftScan iterates over all keys in the database beginning with the given
Expand Down Expand Up @@ -581,12 +597,12 @@ func (b *Bitcask) get(key []byte) (*datafile.Entry, error) {

e, err := df.ReadAt(filer.Index, filer.Size)
if err != nil {
return nil, err
return nil, errors.WithStack(err)
}

if b.opt.ValidateChecksum {
if err := e.Validate(b.opt.RuntimeContext); err != nil {
return nil, err
return nil, errors.WithStack(err)
}
}

Expand All @@ -609,7 +625,7 @@ func (b *Bitcask) maybeRotate() error {
}

if err := b.saveIndexes(); err != nil {
return err
return errors.WithStack(err)
}

return nil
Expand Down Expand Up @@ -652,7 +668,7 @@ func (b *Bitcask) openWritableFile(fileID int32) error {
datafile.CopyTempThreshold(b.opt.CopyTempThreshold),
)
if err != nil {
return err
return errors.WithStack(err)
}
b.curr = curr
return nil
Expand All @@ -671,11 +687,11 @@ func (b *Bitcask) Reopen() error {
func (b *Bitcask) reopen() error {
datafiles, lastID, err := loadDatafiles(b.opt, b.path)
if err != nil {
return err
return errors.WithStack(err)
}
t, ttlIndex, err := loadIndexes(b, datafiles, lastID)
if err != nil {
return err
return errors.WithStack(err)
}

curr, err := datafile.Open(
Expand All @@ -687,7 +703,7 @@ func (b *Bitcask) reopen() error {
datafile.CopyTempThreshold(b.opt.CopyTempThreshold),
)
if err != nil {
return err
return errors.WithStack(err)
}

b.trie = t
Expand Down Expand Up @@ -779,7 +795,7 @@ func loadDatafiles(opt *option, path string) (map[int32]datafile.Datafile, int32
datafile.CopyTempThreshold(opt.CopyTempThreshold),
)
if err != nil {
return nil, 0, err
return nil, 0, errors.WithStack(err)
}
datafiles[id] = d
}
Expand All @@ -797,18 +813,18 @@ func loadDatafiles(opt *option, path string) (map[int32]datafile.Datafile, int32
func loadIndexes(b *Bitcask, datafiles map[int32]datafile.Datafile, lastID int32) (art.Tree, art.Tree, error) {
t, found, err := b.indexer.Load(filepath.Join(b.path, filerIndexFile))
if err != nil {
return nil, nil, err
return nil, nil, errors.WithStack(err)
}
ttlIndex, _, err := b.ttlIndexer.Load(filepath.Join(b.path, ttlIndexFile))
if err != nil {
return nil, nil, err
return nil, nil, errors.WithStack(err)
}
if found && b.metadata.IndexUpToDate {
return t, ttlIndex, nil
}
if found {
if err := loadIndexFromDatafile(t, ttlIndex, datafiles[lastID], nil); err != nil {
return nil, ttlIndex, err
return nil, ttlIndex, errors.WithStack(err)
}
return t, ttlIndex, nil
}
Expand All @@ -824,7 +840,7 @@ func loadIndexes(b *Bitcask, datafiles map[int32]datafile.Datafile, lastID int32
for _, fileID := range fileIds {
df := datafiles[fileID]
if err := loadIndexFromDatafile(t, ttlIndex, df, nil); err != nil {
return nil, ttlIndex, err
return nil, ttlIndex, errors.WithStack(err)
}
}
return t, ttlIndex, nil
Expand All @@ -842,7 +858,7 @@ func loadIndexFromDatafile(t art.Tree, ttlIndex art.Tree, df datafile.Datafile,
if errors.Is(err, io.EOF) {
break
}
return err
return errors.WithStack(err)
}
defer e.Close()

Expand Down Expand Up @@ -916,7 +932,7 @@ func calcDirSize(path string) (int64, error) {
size := int64(0)
if err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
if err != nil {
return err
return errors.WithStack(err)
}
if info.IsDir() != true {
size += info.Size()
Expand Down Expand Up @@ -983,14 +999,14 @@ func Open(path string, funcs ...OptionFunc) (*Bitcask, error) {

ok, err := bitcask.flock.TryLock()
if err != nil {
return nil, err
return nil, errors.WithStack(err)
}
if ok != true {
return nil, ErrDatabaseLocked
}

if err := bitcask.Reopen(); err != nil {
return nil, err
return nil, errors.WithStack(err)
}

if err := repliReciver.Start(bitcask.repliDestination(), opt.RepliServerIP, opt.RepliServerPort); err != nil {
Expand Down
12 changes: 9 additions & 3 deletions bitcask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1530,12 +1530,16 @@ func TestSift(t *testing.T) {
err = db.Sift(func(key []byte) (bool, error) {
return false, ErrMockError
})
assert.Equal(ErrMockError, err)
if errors.Is(err, ErrMockError) != true {
t.Errorf("resolv my error ErrMockError")
}

err = db.SiftScan([]byte("fo"), func(key []byte) (bool, error) {
return true, ErrMockError
})
assert.Equal(ErrMockError, err)
if errors.Is(err, ErrMockError) != true {
t.Errorf("resolv my error ErrMockError")
}
})
}
func TestSiftScan(t *testing.T) {
Expand Down Expand Up @@ -1650,7 +1654,9 @@ func TestScan(t *testing.T) {
return ErrMockError
})
assert.Error(err)
assert.Equal(ErrMockError, err)
if errors.Is(err, ErrMockError) != true {
t.Errorf("resolv my error: ErrMockErr")
}
})
}
func TestSiftRange(t *testing.T) {
Expand Down
6 changes: 4 additions & 2 deletions merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
const (
mergeDirPattern string = "merge*"
removeParkPrefix string = ".mark.%s"
snapshotTrieFile string = "snapshot_trie"
)

const (
Expand Down Expand Up @@ -218,7 +219,7 @@ func (m *merger) markArchive(b *Bitcask, currentFileID int32) ([]string, error)
}

fromFile := filepath.Join(b.path, filename)
toFile := filepath.Join(b.path, fmt.Sprintf(".mark.%s", filename))
toFile := filepath.Join(b.path, fmt.Sprintf(removeParkPrefix, filename))
if err := os.Rename(fromFile, toFile); err != nil {
return nil, errors.WithStack(err)
}
Expand Down Expand Up @@ -371,6 +372,7 @@ func (t *mergeTempDB) Destroy(lim *rate.Limiter) {
return
}
removeFileSlowly(files, lim)
os.RemoveAll(t.tempDir)
t.destroyed = true
}

Expand Down Expand Up @@ -460,7 +462,7 @@ func finalizeSnapshotTrie(st *snapshotTrie) {
}

func openSnapshotTrie(tempDir string) (*snapshotTrie, error) {
f, err := os.CreateTemp(tempDir, "snapshot_trie")
f, err := os.CreateTemp(tempDir, snapshotTrieFile)
if err != nil {
return nil, errors.WithStack(err)
}
Expand Down

0 comments on commit 693f0e3

Please sign in to comment.