Skip to content

Commit

Permalink
Merge pull request #19 from octu0/topic_fix_datafile-lost-on-merge-re…
Browse files Browse the repository at this point in the history
…plication

fix datafile lost on merge replication
  • Loading branch information
octu0 authored Apr 19, 2023
2 parents 8bf9d6b + 3e36f3d commit eec61f3
Show file tree
Hide file tree
Showing 13 changed files with 513 additions and 28 deletions.
2 changes: 1 addition & 1 deletion bitcask.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ func (b *Bitcask) Scan(prefix []byte, f func(key []byte) error) error {
// SiftScan iterates over all keys in the database beginning with the given
// prefix, calling the function `f` for each key. If the KV pair is expired or
// the function returns true, that key is deleted from the database.
// If the function returns an error on any key, no further keys are processed,
// If the function returns an error on any key, no further keys are processed,
// no keys are deleted, and the first error is returned.
func (b *Bitcask) SiftScan(prefix []byte, f func(key []byte) (bool, error)) (err error) {
keysToDelete := art.New()
Expand Down
16 changes: 8 additions & 8 deletions bitcask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1891,19 +1891,19 @@ func TestRange(t *testing.T) {

func TestLocking(t *testing.T) {
testdir, err := os.MkdirTemp("", "bitcask")
if err != nil {
t.Fatalf("no error: %+v", err)
}
if err != nil {
t.Fatalf("no error: %+v", err)
}

db, err := Open(testdir)
if err != nil {
t.Fatalf("no error: %+v", err)
}
if err != nil {
t.Fatalf("no error: %+v", err)
}
defer db.Close()

if _, err = Open(testdir); err == nil {
t.Errorf("must error")
}
t.Errorf("must error")
}
}

func TestGetExpiredInsideFold(t *testing.T) {
Expand Down
5 changes: 5 additions & 0 deletions indexer/filer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ const (
FilerByteSize int = datafile.FileIDByteSize + 8 + 8
)

type MergeFiler struct {
FileID datafile.FileID
Filer Filer
}

// Filer represents the location of the value on disk. This is used by the
// internal Adaptive Radix Tree to hold an in-memory structure mapping keys to
// locations on disk of where the value(s) can be read from.
Expand Down
43 changes: 28 additions & 15 deletions merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (m *merger) Merge(b *Bitcask, lim *priorate.Limiter) error {
}
defer snapshot.Destroy(lim)

temp, err := m.renewMergedDB(b, mergeFileIds, snapshot, lim)
temp, mergedFiler, err := m.renewMergedDB(b, mergeFileIds, snapshot, lim)
if err != nil {
return errors.WithStack(err)
}
Expand All @@ -81,6 +81,8 @@ func (m *merger) Merge(b *Bitcask, lim *priorate.Limiter) error {
return errors.WithStack(err)
}

b.repliEmit.EmitMerge(mergedFiler)

removeFileSlowly(removeMarkedFiles, lim)
return nil
}
Expand Down Expand Up @@ -186,20 +188,21 @@ func (m *merger) snapshotIndexer(b *Bitcask, lim *priorate.Limiter) (*snapshotTr
return st, nil
}

func (m *merger) renewMergedDB(b *Bitcask, mergeFileIds []datafile.FileID, st *snapshotTrie, lim *priorate.Limiter) (*mergeTempDB, error) {
func (m *merger) renewMergedDB(b *Bitcask, mergeFileIds []datafile.FileID, st *snapshotTrie, lim *priorate.Limiter) (*mergeTempDB, []indexer.MergeFiler, error) {
temp, err := openMergeTempDB(b.path, b.opt)
if err != nil {
return nil, errors.WithStack(err)
return nil, nil, errors.WithStack(err)
}

if err := temp.MergeDatafiles(b, mergeFileIds, st, lim); err != nil {
return nil, errors.WithStack(err)
mergedFiler, err := temp.MergeDatafiles(b, mergeFileIds, st, lim)
if err != nil {
return nil, nil, errors.WithStack(err)
}

if err := temp.SyncAndClose(); err != nil {
return nil, errors.WithStack(err)
return nil, nil, errors.WithStack(err)
}
return temp, nil
return temp, mergedFiler, nil
}

func (m *merger) moveMerged(b *Bitcask, lastFileID datafile.FileID, mergedDBPath string) ([]string, error) {
Expand Down Expand Up @@ -300,7 +303,7 @@ func (t *mergeTempDB) DB() *Bitcask {
// Rewrite all key/value pairs into merged database
// Doing this automatically strips deleted keys and
// old key/value pairs
func (t *mergeTempDB) MergeDatafiles(src *Bitcask, mergeFileIds []datafile.FileID, st *snapshotTrie, lim *priorate.Limiter) error {
func (t *mergeTempDB) MergeDatafiles(src *Bitcask, mergeFileIds []datafile.FileID, st *snapshotTrie, lim *priorate.Limiter) ([]indexer.MergeFiler, error) {
t.mdb.mu.Lock()
defer t.mdb.mu.Unlock()

Expand All @@ -317,19 +320,21 @@ func (t *mergeTempDB) MergeDatafiles(src *Bitcask, mergeFileIds []datafile.FileI
datafile.CopyTempThreshold(src.opt.CopyTempThreshold),
)
if err != nil {
return errors.WithStack(err)
return nil, errors.WithStack(err)
}
m[fileID] = df
}

if err := t.mergeDatafileLocked(st, m, lim); err != nil {
return errors.WithStack(err)
mergedFiler, err := t.mergeDatafileLocked(st, m, lim)
if err != nil {
return nil, errors.WithStack(err)
}
return nil
return mergedFiler, nil
}

func (t *mergeTempDB) mergeDatafileLocked(st *snapshotTrie, m map[datafile.FileID]datafile.Datafile, lim *priorate.Limiter) error {
return st.ReadAll(func(data snapshotTrieData) error {
func (t *mergeTempDB) mergeDatafileLocked(st *snapshotTrie, m map[datafile.FileID]datafile.Datafile, lim *priorate.Limiter) ([]indexer.MergeFiler, error) {
mergedFiler := make([]indexer.MergeFiler, 0, len(m))
if err := st.ReadAll(func(data snapshotTrieData) error {
filer := data.Value

df, ok := m[filer.FileID]
Expand Down Expand Up @@ -365,8 +370,16 @@ func (t *mergeTempDB) mergeDatafileLocked(st *snapshotTrie, m map[datafile.FileI
if _, _, err := t.mdb.put(e.Key, e.Value, e.Expiry); err != nil {
return errors.WithStack(err)
}
mergedFiler = append(mergedFiler, indexer.MergeFiler{
FileID: t.mdb.curr.FileID(),
Filer: filer,
})

return nil
})
}); err != nil {
return nil, errors.WithStack(err)
}
return mergedFiler, nil
}

func (t *mergeTempDB) SyncAndClose() error {
Expand Down
99 changes: 98 additions & 1 deletion repli.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"io"
"time"

"github.com/octu0/priorate"
"github.com/pkg/errors"

"github.com/octu0/bitcaskdb/datafile"
Expand Down Expand Up @@ -155,7 +156,27 @@ func (d *repliDestination) datafileOpen(fileID datafile.FileID) (datafile.Datafi
datafile.CopyTempThreshold(d.b.opt.CopyTempThreshold),
)
if err != nil {
return nil, false, err
return nil, false, errors.WithStack(err)
}
return df, true, nil
}

func (d *repliDestination) datafileOpenRead(fileID datafile.FileID) (datafile.Datafile, bool, error) {
if d.b.curr.FileID().Equal(fileID) {
return d.b.curr, false, nil
}
if df, ok := d.b.datafiles[fileID]; ok {
return df, false, nil
}

df, err := datafile.OpenReadonly(fileID, d.b.path,
datafile.RuntimeContext(d.b.opt.RuntimeContext),
datafile.FileMode(d.b.opt.FileFileModeBeforeUmask),
datafile.TempDir(d.b.opt.TempDir),
datafile.CopyTempThreshold(d.b.opt.CopyTempThreshold),
)
if err != nil {
return nil, false, errors.WithStack(err)
}
return df, true, nil
}
Expand All @@ -167,6 +188,82 @@ func (d *repliDestination) Delete(key []byte) error {
return d.b.deleteLocked(key)
}

func (d *repliDestination) merge(dst, src datafile.Datafile, f indexer.Filer) (int64, int64, error) {
e, err := src.ReadAt(f.Index, f.Size)
if err != nil {
return 0, 0, errors.WithStack(err)
}
defer e.Close()

index, size, err := dst.Write(e.Key, e.Value, e.Expiry)
if err != nil {
return 0, 0, errors.WithStack(err)
}

newFiler := indexer.Filer{
FileID: dst.FileID(),
Index: index,
Size: size,
}
d.b.trie.Insert(e.Key, newFiler)
return index, size, nil
}

func (d *repliDestination) mergeDatafile(mergedFiler []indexer.MergeFiler) error {
d.b.mu.Lock()
defer d.b.mu.Unlock()

newFiles := make(map[datafile.FileID]datafile.Datafile, len(mergedFiler))
for _, mf := range mergedFiler {
newDF, _, err := d.datafileOpen(mf.FileID)
if err != nil {
return errors.WithStack(err)
}
newFiles[mf.FileID] = newDF
}

oldFiles := make(map[datafile.FileID]datafile.Datafile, len(mergedFiler))
for _, mf := range mergedFiler {
oldDF, _, err := d.datafileOpenRead(mf.Filer.FileID)
if err != nil {
return errors.WithStack(err)
}
oldFiles[mf.Filer.FileID] = oldDF
}

for _, mf := range mergedFiler {
newDF := newFiles[mf.FileID]
oldDF := oldFiles[mf.Filer.FileID]
if _, _, err := d.merge(newDF, oldDF, mf.Filer); err != nil {
return errors.WithStack(err)
}
}
for _, df := range newFiles {
if err := df.Sync(); err != nil {
return errors.WithStack(err)
}
d.b.datafiles[df.FileID()] = df
}

removeFiles := make([]string, 0, len(oldFiles))
for _, df := range oldFiles {
delete(d.b.datafiles, df.FileID())
df.Close()
removeFiles = append(removeFiles, df.Name())
}
if err := removeFileSlowly(removeFiles, priorate.InfLimiter()); err != nil {
return errors.WithStack(err)
}
return nil
}

func (d *repliDestination) Merge(mergedFiler []indexer.MergeFiler) error {
if err := d.mergeDatafile(mergedFiler); err != nil {
return errors.WithStack(err)
}
return nil
}

func newRepliDestination(b *Bitcask) *repliDestination {
return &repliDestination{b}
}
4 changes: 4 additions & 0 deletions repli/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ func (*noopEmitter) EmitCurrentFileID(fileID datafile.FileID) error {
return nil
}

func (*noopEmitter) EmitMerge(merged []indexer.MergeFiler) error {
return nil
}

func NewNoopEmitter() *noopEmitter {
return new(noopEmitter)
}
Expand Down
2 changes: 2 additions & 0 deletions repli/repli.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Emitter interface {
EmitInsert(filer indexer.Filer) error
EmitDelete(key []byte) error
EmitCurrentFileID(datafile.FileID) error
EmitMerge([]indexer.MergeFiler) error
}

type Reciver interface {
Expand All @@ -39,4 +40,5 @@ type Destination interface {
LastFiles() []FileIDAndIndex
Insert(fileID datafile.FileID, index int64, checksum uint32, key []byte, r io.Reader, expiry time.Time) error
Delete(key []byte) error
Merge([]indexer.MergeFiler) error
}
Loading

0 comments on commit eec61f3

Please sign in to comment.