Skip to content

Commit

Permalink
fixed a case where keys written during Merge() are lost (data exists)
Browse files Browse the repository at this point in the history
  • Loading branch information
octu0 committed Jul 1, 2022
1 parent c5c0b1e commit 4ba0415
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 26 deletions.
44 changes: 22 additions & 22 deletions merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (m *merger) Merge(b *Bitcask, lim *rate.Limiter) error {
m.mutex.Unlock()
}()

mergeFileIds, err := m.forwardCurrentDafafile(b)
lastFileID, mergeFileIds, err := m.forwardCurrentDafafile(b)
if err != nil {
return errors.WithStack(err)
}
Expand All @@ -75,7 +75,7 @@ func (m *merger) Merge(b *Bitcask, lim *rate.Limiter) error {
defer temp.Destroy(lim)

// Reduce b blocking time by performing b.mu.Lock/Unlock within merger.reopen()
removeMarkedFiles, err := m.reopen(b, temp)
removeMarkedFiles, err := m.reopen(b, temp, lastFileID)
if err != nil {
return errors.WithStack(err)
}
Expand All @@ -84,41 +84,41 @@ func (m *merger) Merge(b *Bitcask, lim *rate.Limiter) error {
return nil
}

func (m *merger) reopen(b *Bitcask, temp *mergeTempDB) ([]string, error) {
func (m *merger) reopen(b *Bitcask, temp *mergeTempDB, lastFileID int32) ([]string, error) {
// no reads and writes till we reopen
b.mu.Lock()
defer b.mu.Unlock()

if err := b.close(); err != nil {
if err := b.closeLocked(); err != nil {
// try recovery
if err2 := b.reopen(); err2 != nil {
if err2 := b.reopenLocked(); err2 != nil {
return nil, errors.Wrapf(err2, "failed close() / reopen() cause:%+v", err)
}
return nil, errors.Wrap(err, "failed to close()")
}

removeMarkedFiles, err := m.moveMerged(b, temp.DBPath())
removeMarkedFiles, err := m.moveMerged(b, lastFileID, temp.DBPath())
if err != nil {
return nil, errors.WithStack(err)
}

b.metadata.ReclaimableSpace = 0

// And finally reopen the database
if err := b.reopen(); err != nil {
if err := b.reopenLocked(); err != nil {
removeFileSlowly(removeMarkedFiles, nil)
return nil, errors.WithStack(err)
}
return removeMarkedFiles, nil
}

func (m *merger) forwardCurrentDafafile(b *Bitcask) ([]int32, error) {
func (m *merger) forwardCurrentDafafile(b *Bitcask) (int32, []int32, error) {
b.mu.Lock()
defer b.mu.Unlock()

currentFileID, err := b.closeCurrentFile()
if err != nil {
return nil, errors.WithStack(err)
return 0, nil, errors.WithStack(err)
}

mergeFileIds := make([]int32, 0, len(b.datafiles))
Expand All @@ -127,14 +127,14 @@ func (m *merger) forwardCurrentDafafile(b *Bitcask) ([]int32, error) {
}

if err := b.openWritableFile(currentFileID + 1); err != nil {
return nil, errors.WithStack(err)
return 0, nil, errors.WithStack(err)
}

sort.Slice(mergeFileIds, func(i, j int) bool {
return mergeFileIds[i] < mergeFileIds[j]
})

return mergeFileIds, nil
return currentFileID, mergeFileIds, nil
}

func (m *merger) snapshotIndexer(b *Bitcask, lim *rate.Limiter) (*snapshotTrie, error) {
Expand Down Expand Up @@ -175,7 +175,7 @@ func (m *merger) renewMergedDB(b *Bitcask, mergeFileIds []int32, st *snapshotTri
return nil, errors.WithStack(err)
}

if err := temp.Merge(b, mergeFileIds, st, lim); err != nil {
if err := temp.MergeDatafiles(b, mergeFileIds, st, lim); err != nil {
return nil, errors.WithStack(err)
}

Expand All @@ -185,10 +185,8 @@ func (m *merger) renewMergedDB(b *Bitcask, mergeFileIds []int32, st *snapshotTri
return temp, nil
}

func (m *merger) moveMerged(b *Bitcask, mergedDBPath string) ([]string, error) {
currentFileID := b.curr.FileID()

removeMarkedFiles, err := m.markArchive(b, currentFileID)
func (m *merger) moveMerged(b *Bitcask, lastFileID int32, mergedDBPath string) ([]string, error) {
removeMarkedFiles, err := m.markArchive(b, lastFileID)
if err != nil {
return nil, errors.WithStack(err)
}
Expand All @@ -198,7 +196,7 @@ func (m *merger) moveMerged(b *Bitcask, mergedDBPath string) ([]string, error) {
return removeMarkedFiles, nil
}

func (m *merger) markArchive(b *Bitcask, currentFileID int32) ([]string, error) {
func (m *merger) markArchive(b *Bitcask, lastFileID int32) ([]string, error) {
files, err := os.ReadDir(b.path)
if err != nil {
return nil, errors.WithStack(err)
Expand All @@ -222,7 +220,7 @@ func (m *merger) markArchive(b *Bitcask, currentFileID int32) ([]string, error)
if 0 < len(ids) {
fileID := ids[0]
// keep currentFileID or newer
if currentFileID <= fileID {
if lastFileID < fileID {
continue
}
}
Expand All @@ -249,6 +247,10 @@ func (m *merger) moveDBFiles(b *Bitcask, fromDBPath string) error {
if filename == lockfile {
continue
}
// keep current index
if filename == filerIndexFile || filename == ttlIndexFile {
continue
}

fromFile := filepath.Join(fromDBPath, filename)
toFile := filepath.Join(b.path, filename)
Expand Down Expand Up @@ -284,7 +286,7 @@ func (t *mergeTempDB) DB() *Bitcask {
// Rewrite all key/value pairs into merged database
// Doing this automatically strips deleted keys and
// old key/value pairs
func (t *mergeTempDB) Merge(src *Bitcask, mergeFileIds []int32, st *snapshotTrie, lim *rate.Limiter) error {
func (t *mergeTempDB) MergeDatafiles(src *Bitcask, mergeFileIds []int32, st *snapshotTrie, lim *rate.Limiter) error {
t.mdb.mu.Lock()
defer t.mdb.mu.Unlock()

Expand Down Expand Up @@ -320,8 +322,6 @@ func (t *mergeTempDB) mergeDatafileLocked(st *snapshotTrie, m map[int32]datafile

df, ok := m[filer.FileID]
if ok != true {
// if fileID was updated after start of merge operation, nothing to do
// markArchive() to keep the new FileID
return nil
}

Expand All @@ -335,7 +335,7 @@ func (t *mergeTempDB) mergeDatafileLocked(st *snapshotTrie, m map[int32]datafile
if isExpiredFromTime(e.Expiry) {
return nil
}
if err := t.mdb.putAndIndexLocked(e.Key, e.Value, e.Expiry); err != nil {
if _, _, err := t.mdb.put(e.Key, e.Value, e.Expiry); err != nil {
return errors.WithStack(err)
}

Expand Down
79 changes: 75 additions & 4 deletions merge_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package bitcaskdb

import (
"io/ioutil"
"context"
"fmt"
"os"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
)
Expand All @@ -16,7 +19,7 @@ func TestMerge(t *testing.T) {

assert := assert.New(t)

testdir, err := ioutil.TempDir("", "bitcask")
testdir, err := os.MkdirTemp("", "bitcask")
assert.NoError(err)

t.Run("Setup", func(t *testing.T) {
Expand Down Expand Up @@ -76,7 +79,7 @@ func TestMergeErrors(t *testing.T) {
assert := assert.New(t)

t.Run("RemoveDatabaseDirectory", func(t *testing.T) {
testdir, err := ioutil.TempDir("", "bitcask")
testdir, err := os.MkdirTemp("", "bitcask")
assert.NoError(err)
defer os.RemoveAll(testdir)

Expand All @@ -93,7 +96,7 @@ func TestMergeErrors(t *testing.T) {
func TestMergeLockingAfterMerge(t *testing.T) {
assert := assert.New(t)

testdir, err := ioutil.TempDir("", "bitcask")
testdir, err := os.MkdirTemp("", "bitcask")
assert.NoError(err)

db, err := Open(testdir)
Expand All @@ -110,3 +113,71 @@ func TestMergeLockingAfterMerge(t *testing.T) {
_, err = Open(testdir)
assert.Error(err)
}

func TestMergeGoroutine(t *testing.T) {
testdir, err := os.MkdirTemp("", "bitcask")
if err != nil {
t.Fatalf("no error! %+v", err)
}
db, err := Open(testdir, WithMaxDatafileSize(32))
if err != nil {
t.Fatalf("no error! %+v", err)
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

wg := new(sync.WaitGroup)
for i := 0; i < 100; i += 1 {
wg.Add(1)
go func(ctx context.Context, w *sync.WaitGroup, id int) {
defer w.Done()

j := 0
for {
select {
case <-ctx.Done():
return
default:
}
j += 1

key := []byte(fmt.Sprintf("%d-%d", id, j))
if err := db.PutBytes(key, key); err != nil {
t.Errorf("no error %+v", err)
}
if db.Has(key) != true {
t.Errorf("%s exists!", key)
}
e, err := db.Get(key)
if err != nil {
t.Errorf("no error! %+v", err)
}
defer e.Close()
}
}(ctx, wg, i)
}

wg.Add(1)
go func(ctx context.Context, w *sync.WaitGroup) {
defer w.Done()

for {
select {
case <-ctx.Done():
return
default:
}

t.Logf("merge start")
if err := db.Merge(); err != nil {
t.Errorf("no error! %+v", err)
}
t.Logf("merged")
}
}(ctx, wg)

<-ctx.Done()
wg.Wait()
db.Close()
}

0 comments on commit 4ba0415

Please sign in to comment.