Skip to content

Commit

Permalink
Merge pull request #11 from octu0/v1.3.6
Browse files Browse the repository at this point in the history
v1.3.6
  • Loading branch information
octu0 authored Jun 30, 2022
2 parents 84418f4 + 34d1201 commit 17441ef
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 8 deletions.
4 changes: 4 additions & 0 deletions indexer/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import (
"github.com/octu0/bitcaskdb/runtime"
)

const (
FilerByte int = 4 + 8 + 8
)

// Filer represents the location of the value on disk. This is used by the
// internal Adaptive Radix Tree to hold an in-memory structure mapping keys to
// locations on disk of where the value(s) can be read from.
Expand Down
23 changes: 16 additions & 7 deletions merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (m *merger) Merge(b *Bitcask, lim *rate.Limiter) error {
return errors.WithStack(err)
}

snapshot, err := m.snapshotIndexer(b)
snapshot, err := m.snapshotIndexer(b, lim)
if err != nil {
return errors.WithStack(err)
}
Expand Down Expand Up @@ -137,7 +137,7 @@ func (m *merger) forwardCurrentDafafile(b *Bitcask) ([]int32, error) {
return mergeFileIds, nil
}

func (m *merger) snapshotIndexer(b *Bitcask) (*snapshotTrie, error) {
func (m *merger) snapshotIndexer(b *Bitcask, lim *rate.Limiter) (*snapshotTrie, error) {
b.mu.RLock()
defer b.mu.RUnlock()

Expand All @@ -152,6 +152,15 @@ func (m *merger) snapshotIndexer(b *Bitcask) (*snapshotTrie, error) {
lastErr = errors.WithStack(err)
return false
}

r := lim.ReserveN(time.Now(), indexer.FilerByte)
if r.OK() != true {
return true
}

if d := r.Delay(); 0 < d {
time.Sleep(d)
}
return true
})
if lastErr != nil {
Expand Down Expand Up @@ -280,6 +289,11 @@ func (t *mergeTempDB) Merge(src *Bitcask, mergeFileIds []int32, st *snapshotTrie
defer t.mdb.mu.Unlock()

m := make(map[int32]datafile.Datafile, len(mergeFileIds))
defer func() {
for _, df := range m {
df.Close()
}
}()
for _, fileID := range mergeFileIds {
df, err := datafile.OpenReadonly(
datafile.RuntimeContext(src.opt.RuntimeContext),
Expand All @@ -293,11 +307,6 @@ func (t *mergeTempDB) Merge(src *Bitcask, mergeFileIds []int32, st *snapshotTrie
}
m[fileID] = df
}
defer func() {
for _, df := range m {
df.Close()
}
}()

if err := t.mergeDatafileLocked(st, m, lim); err != nil {
return errors.WithStack(err)
Expand Down
2 changes: 1 addition & 1 deletion version.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ package bitcaskdb

const (
AppName string = "bitcaskdb"
Version string = "1.3.5"
Version string = "1.3.6"
)

0 comments on commit 17441ef

Please sign in to comment.