Skip to content

Commit

Permalink
Merge pull request #12 from octu0/v1.3.7
Browse files Browse the repository at this point in the history
v1.3.7
  • Loading branch information
octu0 authored Jul 1, 2022
2 parents 17441ef + 4ba0415 commit 8405558
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 71 deletions.
92 changes: 49 additions & 43 deletions bitcask.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,13 @@ func (b *Bitcask) Close() error {
return errors.WithStack(err)
}

if err := b.close(); err != nil {
if err := b.closeLocked(); err != nil {
return errors.WithStack(err)
}
return nil
}

func (b *Bitcask) close() error {
func (b *Bitcask) closeLocked() error {
if err := b.saveIndexes(); err != nil {
return errors.WithStack(err)
}
Expand Down Expand Up @@ -150,13 +150,51 @@ func (b *Bitcask) Get(key []byte) (io.ReadCloser, error) {
b.mu.RLock()
defer b.mu.RUnlock()

return b.get(key)
return b.getLocked(key)
}

// get retrieves the value of the given key
func (b *Bitcask) getLocked(key []byte) (*datafile.Entry, error) {
value, found := b.trie.Search(key)
if found != true {
return nil, ErrKeyNotFound
}
if b.isExpired(key) {
return nil, ErrKeyExpired
}

filer := value.(indexer.Filer)

var df datafile.Datafile
if filer.FileID == b.curr.FileID() {
df = b.curr
} else {
df = b.datafiles[filer.FileID]
}

e, err := df.ReadAt(filer.Index, filer.Size)
if err != nil {
return nil, errors.WithStack(err)
}

if b.opt.ValidateChecksum {
if err := e.Validate(b.opt.RuntimeContext); err != nil {
return nil, errors.WithStack(err)
}
}

return e, nil
}

// Has returns true if the key exists in the database, false otherwise.
func (b *Bitcask) Has(key []byte) bool {
b.mu.RLock()
defer b.mu.RUnlock()

return b.hasLocked(key)
}

func (b *Bitcask) hasLocked(key []byte) bool {
_, found := b.trie.Search(key)
if found != true {
return false
Expand Down Expand Up @@ -252,12 +290,12 @@ func (b *Bitcask) Delete(key []byte) error {
b.mu.Lock()
defer b.mu.Unlock()

return b.delete(key)
return b.deleteLocked(key)
}

// delete deletes the named key. If the key doesn't exist or an I/O error
// occurs the error is returned.
func (b *Bitcask) delete(key []byte) error {
func (b *Bitcask) deleteLocked(key []byte) error {
_, _, err := b.put(key, nil, time.Time{})
if err != nil {
return errors.WithStack(err)
Expand Down Expand Up @@ -314,7 +352,7 @@ func (b *Bitcask) Sift(f func(key []byte) (bool, error)) error {
defer b.mu.Unlock()

keysToDelete.ForEach(func(node art.Node) (cont bool) {
b.delete(node.Key())
b.deleteLocked(node.Key())
return true
})
return nil
Expand Down Expand Up @@ -413,8 +451,9 @@ func (b *Bitcask) SiftScan(prefix []byte, f func(key []byte) (bool, error)) (err

b.mu.Lock()
defer b.mu.Unlock()

keysToDelete.ForEach(func(node art.Node) (cont bool) {
b.delete(node.Key())
b.deleteLocked(node.Key())
return true
})
return
Expand Down Expand Up @@ -502,7 +541,7 @@ func (b *Bitcask) SiftRange(start, end []byte, f func(key []byte) (bool, error))
defer b.mu.Unlock()

keysToDelete.ForEach(func(node art.Node) (cont bool) {
b.delete(node.Key())
b.deleteLocked(node.Key())
return true
})

Expand Down Expand Up @@ -576,39 +615,6 @@ func (b *Bitcask) Fold(f func(key []byte) error) (err error) {
return
}

// get retrieves the value of the given key
func (b *Bitcask) get(key []byte) (*datafile.Entry, error) {
value, found := b.trie.Search(key)
if found != true {
return nil, ErrKeyNotFound
}
if b.isExpired(key) {
return nil, ErrKeyExpired
}

filer := value.(indexer.Filer)

var df datafile.Datafile
if filer.FileID == b.curr.FileID() {
df = b.curr
} else {
df = b.datafiles[filer.FileID]
}

e, err := df.ReadAt(filer.Index, filer.Size)
if err != nil {
return nil, errors.WithStack(err)
}

if b.opt.ValidateChecksum {
if err := e.Validate(b.opt.RuntimeContext); err != nil {
return nil, errors.WithStack(err)
}
}

return e, nil
}

func (b *Bitcask) maybeRotate() error {
size := b.curr.Size()
if size < int64(b.opt.MaxDatafileSize) {
Expand Down Expand Up @@ -679,12 +685,12 @@ func (b *Bitcask) Reopen() error {
b.mu.Lock()
defer b.mu.Unlock()

return b.reopen()
return b.reopenLocked()
}

// reopen reloads a bitcask object with index and datafiles
// caller of this method should take care of locking
func (b *Bitcask) reopen() error {
func (b *Bitcask) reopenLocked() error {
datafiles, lastID, err := loadDatafiles(b.opt, b.path)
if err != nil {
return errors.WithStack(err)
Expand Down
44 changes: 22 additions & 22 deletions merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (m *merger) Merge(b *Bitcask, lim *rate.Limiter) error {
m.mutex.Unlock()
}()

mergeFileIds, err := m.forwardCurrentDafafile(b)
lastFileID, mergeFileIds, err := m.forwardCurrentDafafile(b)
if err != nil {
return errors.WithStack(err)
}
Expand All @@ -75,7 +75,7 @@ func (m *merger) Merge(b *Bitcask, lim *rate.Limiter) error {
defer temp.Destroy(lim)

// Reduce b blocking time by performing b.mu.Lock/Unlock within merger.reopen()
removeMarkedFiles, err := m.reopen(b, temp)
removeMarkedFiles, err := m.reopen(b, temp, lastFileID)
if err != nil {
return errors.WithStack(err)
}
Expand All @@ -84,41 +84,41 @@ func (m *merger) Merge(b *Bitcask, lim *rate.Limiter) error {
return nil
}

func (m *merger) reopen(b *Bitcask, temp *mergeTempDB) ([]string, error) {
func (m *merger) reopen(b *Bitcask, temp *mergeTempDB, lastFileID int32) ([]string, error) {
// no reads and writes till we reopen
b.mu.Lock()
defer b.mu.Unlock()

if err := b.close(); err != nil {
if err := b.closeLocked(); err != nil {
// try recovery
if err2 := b.reopen(); err2 != nil {
if err2 := b.reopenLocked(); err2 != nil {
return nil, errors.Wrapf(err2, "failed close() / reopen() cause:%+v", err)
}
return nil, errors.Wrap(err, "failed to close()")
}

removeMarkedFiles, err := m.moveMerged(b, temp.DBPath())
removeMarkedFiles, err := m.moveMerged(b, lastFileID, temp.DBPath())
if err != nil {
return nil, errors.WithStack(err)
}

b.metadata.ReclaimableSpace = 0

// And finally reopen the database
if err := b.reopen(); err != nil {
if err := b.reopenLocked(); err != nil {
removeFileSlowly(removeMarkedFiles, nil)
return nil, errors.WithStack(err)
}
return removeMarkedFiles, nil
}

func (m *merger) forwardCurrentDafafile(b *Bitcask) ([]int32, error) {
func (m *merger) forwardCurrentDafafile(b *Bitcask) (int32, []int32, error) {
b.mu.Lock()
defer b.mu.Unlock()

currentFileID, err := b.closeCurrentFile()
if err != nil {
return nil, errors.WithStack(err)
return 0, nil, errors.WithStack(err)
}

mergeFileIds := make([]int32, 0, len(b.datafiles))
Expand All @@ -127,14 +127,14 @@ func (m *merger) forwardCurrentDafafile(b *Bitcask) ([]int32, error) {
}

if err := b.openWritableFile(currentFileID + 1); err != nil {
return nil, errors.WithStack(err)
return 0, nil, errors.WithStack(err)
}

sort.Slice(mergeFileIds, func(i, j int) bool {
return mergeFileIds[i] < mergeFileIds[j]
})

return mergeFileIds, nil
return currentFileID, mergeFileIds, nil
}

func (m *merger) snapshotIndexer(b *Bitcask, lim *rate.Limiter) (*snapshotTrie, error) {
Expand Down Expand Up @@ -175,7 +175,7 @@ func (m *merger) renewMergedDB(b *Bitcask, mergeFileIds []int32, st *snapshotTri
return nil, errors.WithStack(err)
}

if err := temp.Merge(b, mergeFileIds, st, lim); err != nil {
if err := temp.MergeDatafiles(b, mergeFileIds, st, lim); err != nil {
return nil, errors.WithStack(err)
}

Expand All @@ -185,10 +185,8 @@ func (m *merger) renewMergedDB(b *Bitcask, mergeFileIds []int32, st *snapshotTri
return temp, nil
}

func (m *merger) moveMerged(b *Bitcask, mergedDBPath string) ([]string, error) {
currentFileID := b.curr.FileID()

removeMarkedFiles, err := m.markArchive(b, currentFileID)
func (m *merger) moveMerged(b *Bitcask, lastFileID int32, mergedDBPath string) ([]string, error) {
removeMarkedFiles, err := m.markArchive(b, lastFileID)
if err != nil {
return nil, errors.WithStack(err)
}
Expand All @@ -198,7 +196,7 @@ func (m *merger) moveMerged(b *Bitcask, mergedDBPath string) ([]string, error) {
return removeMarkedFiles, nil
}

func (m *merger) markArchive(b *Bitcask, currentFileID int32) ([]string, error) {
func (m *merger) markArchive(b *Bitcask, lastFileID int32) ([]string, error) {
files, err := os.ReadDir(b.path)
if err != nil {
return nil, errors.WithStack(err)
Expand All @@ -222,7 +220,7 @@ func (m *merger) markArchive(b *Bitcask, currentFileID int32) ([]string, error)
if 0 < len(ids) {
fileID := ids[0]
// keep currentFileID or newer
if currentFileID <= fileID {
if lastFileID < fileID {
continue
}
}
Expand All @@ -249,6 +247,10 @@ func (m *merger) moveDBFiles(b *Bitcask, fromDBPath string) error {
if filename == lockfile {
continue
}
// keep current index
if filename == filerIndexFile || filename == ttlIndexFile {
continue
}

fromFile := filepath.Join(fromDBPath, filename)
toFile := filepath.Join(b.path, filename)
Expand Down Expand Up @@ -284,7 +286,7 @@ func (t *mergeTempDB) DB() *Bitcask {
// Rewrite all key/value pairs into merged database
// Doing this automatically strips deleted keys and
// old key/value pairs
func (t *mergeTempDB) Merge(src *Bitcask, mergeFileIds []int32, st *snapshotTrie, lim *rate.Limiter) error {
func (t *mergeTempDB) MergeDatafiles(src *Bitcask, mergeFileIds []int32, st *snapshotTrie, lim *rate.Limiter) error {
t.mdb.mu.Lock()
defer t.mdb.mu.Unlock()

Expand Down Expand Up @@ -320,8 +322,6 @@ func (t *mergeTempDB) mergeDatafileLocked(st *snapshotTrie, m map[int32]datafile

df, ok := m[filer.FileID]
if ok != true {
// if fileID was updated after start of merge operation, nothing to do
// markArchive() to keep the new FileID
return nil
}

Expand All @@ -335,7 +335,7 @@ func (t *mergeTempDB) mergeDatafileLocked(st *snapshotTrie, m map[int32]datafile
if isExpiredFromTime(e.Expiry) {
return nil
}
if err := t.mdb.putAndIndexLocked(e.Key, e.Value, e.Expiry); err != nil {
if _, _, err := t.mdb.put(e.Key, e.Value, e.Expiry); err != nil {
return errors.WithStack(err)
}

Expand Down
Loading

0 comments on commit 8405558

Please sign in to comment.