Skip to content

Commit

Permalink
Merge pull request #5 from octu0/v1.3.1
Browse files Browse the repository at this point in the history
v1.3.1
octu0 authored Jun 24, 2022

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
2 parents 4099670 + abe93f7 commit 80da258
Showing 5 changed files with 98 additions and 52 deletions.
72 changes: 45 additions & 27 deletions bitcask.go
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"io"
"math"
"os"
"path/filepath"
"sort"
@@ -14,6 +15,7 @@ import (
"github.com/gofrs/flock"
"github.com/pkg/errors"
art "github.com/plar/go-adaptive-radix-tree"
"golang.org/x/time/rate"

"github.com/octu0/bitcaskdb/datafile"
"github.com/octu0/bitcaskdb/indexer"
@@ -700,7 +702,15 @@ func (b *Bitcask) reopen() error {
// and deleted keys removes. Duplicate key/value pairs are also removed.
// Call this function periodically to reclaim disk space.
func (b *Bitcask) Merge() error {
return b.merger.Merge(b)
return b.merger.Merge(b, rate.NewLimiter(rate.Inf, math.MaxInt))
}

func (b *Bitcask) MergeWithWaitLimit(lim *rate.Limiter) error {
return b.merger.Merge(b, lim)
}

func (b *Bitcask) MergeWithWaitLimitByBytesPerSecond(bytesPerSecond int) error {
return b.merger.Merge(b, rate.NewLimiter(rate.Limit(float64(bytesPerSecond)), bytesPerSecond))
}

// saveIndex saves index and ttl_index currently in RAM to disk
@@ -781,17 +791,6 @@ func loadDatafiles(opt *option, path string) (map[int32]datafile.Datafile, int32
return datafiles, lastID, nil
}

func getSortedDatafiles(datafiles map[int32]datafile.Datafile) []datafile.Datafile {
out := make([]datafile.Datafile, 0, len(datafiles))
for _, df := range datafiles {
out = append(out, df)
}
sort.Slice(out, func(i, j int) bool {
return out[i].FileID() < out[j].FileID()
})
return out
}

// loadIndexes loads index from disk to memory. If index is not available or partially available (last bitcask process crashed)
// then it iterates over last datafile and construct index
// we construct ttl_index here also along with normal index
@@ -808,22 +807,34 @@ func loadIndexes(b *Bitcask, datafiles map[int32]datafile.Datafile, lastID int32
return t, ttlIndex, nil
}
if found {
if err := loadIndexFromDatafile(t, ttlIndex, datafiles[lastID]); err != nil {
if err := loadIndexFromDatafile(t, ttlIndex, datafiles[lastID], nil); err != nil {
return nil, ttlIndex, err
}
return t, ttlIndex, nil
}

sortedDatafiles := getSortedDatafiles(datafiles)
for _, df := range sortedDatafiles {
if err := loadIndexFromDatafile(t, ttlIndex, df); err != nil {
fileIds := make([]int32, 0, len(datafiles))
for _, df := range datafiles {
fileIds = append(fileIds, df.FileID())
}
sort.Slice(fileIds, func(i, j int) bool {
return fileIds[i] < fileIds[j]
})

for _, fileID := range fileIds {
df := datafiles[fileID]
if err := loadIndexFromDatafile(t, ttlIndex, df, nil); err != nil {
return nil, ttlIndex, err
}
}
return t, ttlIndex, nil
}

func loadIndexFromDatafile(t art.Tree, ttlIndex art.Tree, df datafile.Datafile) error {
func loadIndexFromDatafile(t art.Tree, ttlIndex art.Tree, df datafile.Datafile, lim *rate.Limiter) error {
if lim == nil {
lim = rate.NewLimiter(rate.Inf, math.MaxInt)
}

index := int64(0)
for {
e, err := df.Read()
@@ -835,22 +846,29 @@ func loadIndexFromDatafile(t art.Tree, ttlIndex art.Tree, df datafile.Datafile)
}
defer e.Close()

if e.ValueSize == 0 {
if 0 < e.ValueSize {
t.Insert(e.Key, indexer.Filer{
FileID: df.FileID(),
Index: index,
Size: e.TotalSize,
})
if e.Expiry.IsZero() != true {
ttlIndex.Insert(e.Key, e.Expiry)
}
index += e.TotalSize
} else {
// Tombstone value (deleted key)
t.Delete(e.Key)
index += e.TotalSize
continue
}

t.Insert(e.Key, indexer.Filer{
FileID: df.FileID(),
Index: index,
Size: e.TotalSize,
})
if e.Expiry.IsZero() != true {
ttlIndex.Insert(e.Key, e.Expiry)
r := lim.ReserveN(time.Now(), int(e.TotalSize))
if r.OK() != true {
continue
}
if d := r.Delay(); 0 < d {
time.Sleep(d)
}
index += e.TotalSize
}
return nil
}
5 changes: 2 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -3,8 +3,10 @@ module github.com/octu0/bitcaskdb
go 1.18

require (
git.mills.io/prologic/bitcask v1.0.2
github.com/abcum/lcp v0.0.0-20201209214815-7a3f3840be81
github.com/gofrs/flock v0.8.0
github.com/juju/ratelimit v1.0.1
github.com/nats-io/nats-server/v2 v2.8.4
github.com/nats-io/nats.go v1.16.0
github.com/octu0/bp v1.2.0
@@ -15,10 +17,8 @@ require (
)

require (
git.mills.io/prologic/bitcask v1.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/klauspost/compress v1.14.4 // indirect
github.com/kr/pretty v0.1.0 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a // indirect
github.com/nats-io/nkeys v0.3.0 // indirect
@@ -28,6 +28,5 @@ require (
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd // indirect
golang.org/x/sys v0.0.0-20220111092808-5a964db01320 // indirect
golang.org/x/time v0.0.0-20220609170525-579cf78fd858 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)
7 changes: 5 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -126,11 +126,11 @@ github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrU
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.1/go.mod h1:DopwsBzvsk0Fs44TXzsVbJyPhcCPeIwnvohx4u74HPM=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
@@ -201,6 +201,8 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/juju/ratelimit v1.0.1 h1:+7AIFJVQ0EQgq/K9+0Krm7m530Du7tIz0METWzN0RgY=
github.com/juju/ratelimit v1.0.1/go.mod h1:qapgC/Gy+xNh9UxzV13HGGl/6UXNN+ct+vwSgWNm/qk=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
@@ -303,6 +305,7 @@ github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/y
github.com/spf13/viper v1.8.1/go.mod h1:o0Pch8wJ9BVSWGQMbra6iw0oQ5oktSIBaujf1rJH9Ns=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
@@ -665,12 +668,12 @@ google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQ
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
64 changes: 45 additions & 19 deletions merge.go
Original file line number Diff line number Diff line change
@@ -6,9 +6,11 @@ import (
goruntime "runtime"
"sort"
"sync"
"time"

"github.com/pkg/errors"
art "github.com/plar/go-adaptive-radix-tree"
"golang.org/x/time/rate"

"github.com/octu0/bitcaskdb/datafile"
"github.com/octu0/bitcaskdb/indexer"
@@ -30,7 +32,7 @@ func (m *merger) isMerging() bool {
return m.merging
}

func (m *merger) Merge(b *Bitcask) error {
func (m *merger) Merge(b *Bitcask, lim *rate.Limiter) error {
if m.isMerging() {
return errors.WithStack(ErrMergeInProgress)
}
@@ -50,7 +52,7 @@ func (m *merger) Merge(b *Bitcask) error {
return errors.WithStack(err)
}

temp, err := m.renewMergedDB(b, mergeFileIds)
temp, err := m.renewMergedDB(b, mergeFileIds, lim)
if err != nil {
return errors.WithStack(err)
}
@@ -68,7 +70,7 @@ func (m *merger) Merge(b *Bitcask) error {
return errors.Wrap(err, "failed to close()")
}

if err := m.moveMerged(b, temp.DBPath()); err != nil {
if err := m.moveMerged(b, temp.DBPath(), lim); err != nil {
return errors.WithStack(err)
}

@@ -106,13 +108,13 @@ func (m *merger) forwardCurrentDafafile(b *Bitcask) ([]int32, error) {
return mergeFileIds, nil
}

func (m *merger) renewMergedDB(b *Bitcask, mergeFileIds []int32) (*mergeTempDB, error) {
func (m *merger) renewMergedDB(b *Bitcask, mergeFileIds []int32, lim *rate.Limiter) (*mergeTempDB, error) {
temp, err := openMergeTempDB(b.path, b.opt)
if err != nil {
return nil, errors.WithStack(err)
}

if err := temp.Merge(b, mergeFileIds); err != nil {
if err := temp.Merge(b, mergeFileIds, lim); err != nil {
return nil, errors.WithStack(err)
}

@@ -122,10 +124,10 @@ func (m *merger) renewMergedDB(b *Bitcask, mergeFileIds []int32) (*mergeTempDB,
return temp, nil
}

func (m *merger) moveMerged(b *Bitcask, mergedDBPath string) error {
func (m *merger) moveMerged(b *Bitcask, mergedDBPath string, lim *rate.Limiter) error {
currentFileID := b.curr.FileID()

if err := m.removeArchive(b, currentFileID); err != nil {
if err := m.removeArchive(b, currentFileID, lim); err != nil {
return errors.WithStack(err)
}
if err := m.moveDBFiles(b, mergedDBPath); err != nil {
@@ -134,7 +136,7 @@ func (m *merger) moveMerged(b *Bitcask, mergedDBPath string) error {
return nil
}

func (m *merger) removeArchive(b *Bitcask, currentFileID int32) error {
func (m *merger) removeArchive(b *Bitcask, currentFileID int32, lim *rate.Limiter) error {
files, err := os.ReadDir(b.path)
if err != nil {
return errors.WithStack(err)
@@ -155,16 +157,31 @@ func (m *merger) removeArchive(b *Bitcask, currentFileID int32) error {

if 0 < len(ids) {
fileID := ids[0]
// keep current
if currentFileID == fileID {
// keep currentFileID or newer
if currentFileID <= fileID {
continue
}
}

path := filepath.Join(b.path, filename)
stat, err := os.Stat(path)
if err != nil {
return errors.WithStack(err)
}
filesize := stat.Size()

if err := os.Remove(path); err != nil {
return errors.WithStack(err)
}

r := lim.ReserveN(time.Now(), int(filesize))
if r.OK() != true {
continue
}

if d := r.Delay(); 0 < d {
time.Sleep(d)
}
}
return nil
}
@@ -213,13 +230,12 @@ func (t *mergeTempDB) DB() *Bitcask {
// Rewrite all key/value pairs into merged database
// Doing this automatically strips deleted keys and
// old key/value pairs
func (t *mergeTempDB) Merge(src *Bitcask, mergeFileIds []int32) error {
func (t *mergeTempDB) Merge(src *Bitcask, mergeFileIds []int32, lim *rate.Limiter) error {
t.mdb.mu.Lock()
defer t.mdb.mu.Unlock()

m := make(map[int32]datafile.Datafile, len(mergeFileIds))
datafiles := make([]datafile.Datafile, len(mergeFileIds))
for i, fileID := range mergeFileIds {
for _, fileID := range mergeFileIds {
df, err := datafile.OpenReadonly(
datafile.RuntimeContext(src.opt.RuntimeContext),
datafile.Path(src.path),
@@ -230,28 +246,28 @@ func (t *mergeTempDB) Merge(src *Bitcask, mergeFileIds []int32) error {
if err != nil {
return errors.WithStack(err)
}
datafiles[i] = df
m[fileID] = df
}
defer func() {
for _, df := range datafiles {
for _, df := range m {
df.Close()
}
}()

for _, df := range datafiles {
if err := loadIndexFromDatafile(t.mdb.trie, t.mdb.ttlIndex, df); err != nil {
for _, fileID := range mergeFileIds {
df := m[fileID]
if err := loadIndexFromDatafile(t.mdb.trie, t.mdb.ttlIndex, df, lim); err != nil {
return errors.WithStack(err)
}
}

if err := t.mergeDatafileLocked(m); err != nil {
if err := t.mergeDatafileLocked(m, lim); err != nil {
return errors.WithStack(err)
}
return nil
}

func (t *mergeTempDB) mergeDatafileLocked(m map[int32]datafile.Datafile) error {
func (t *mergeTempDB) mergeDatafileLocked(m map[int32]datafile.Datafile, lim *rate.Limiter) error {
var lastErr error
t.mdb.trie.ForEach(func(node art.Node) bool {
filer := node.Value().(indexer.Filer)
@@ -270,6 +286,16 @@ func (t *mergeTempDB) mergeDatafileLocked(m map[int32]datafile.Datafile) error {
lastErr = errors.WithStack(err)
return false
}

r := lim.ReserveN(time.Now(), int(e.TotalSize))
if r.OK() != true {
return true
}

if d := r.Delay(); 0 < d {
time.Sleep(d)
}

return true
})
if lastErr != nil {
2 changes: 1 addition & 1 deletion version.go
Original file line number Diff line number Diff line change
@@ -2,5 +2,5 @@ package bitcaskdb

const (
AppName string = "bitcaskdb"
Version string = "1.3.0"
Version string = "1.3.1"
)

0 comments on commit 80da258

Please sign in to comment.