From 92e8a2f29df96c36505a4ffb4f3c768864da2c86 Mon Sep 17 00:00:00 2001 From: Yusuke Hata Date: Thu, 30 Jun 2022 17:39:25 +0900 Subject: [PATCH 1/3] v1.3.6 --- version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.go b/version.go index 96d6a00..4a94cbd 100644 --- a/version.go +++ b/version.go @@ -2,5 +2,5 @@ package bitcaskdb const ( AppName string = "bitcaskdb" - Version string = "1.3.5" + Version string = "1.3.6" ) From f278c46e51b28e99dee3c5392527c03bfae729f9 Mon Sep 17 00:00:00 2001 From: Yusuke Hata Date: Thu, 30 Jun 2022 21:04:35 +0900 Subject: [PATCH 2/3] snapshotIndexer ratelimit --- indexer/codec.go | 4 ++++ merge.go | 13 +++++++++++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/indexer/codec.go b/indexer/codec.go index 533ba91..60be1af 100644 --- a/indexer/codec.go +++ b/indexer/codec.go @@ -11,6 +11,10 @@ import ( "github.com/octu0/bitcaskdb/runtime" ) +const ( + FilerByte int = 4 + 8 + 8 +) + // Filer represents the location of the value on disk. This is used by the // internal Adaptive Radix Tree to hold an in-memory structure mapping keys to // locations on disk of where the value(s) can be read from. diff --git a/merge.go b/merge.go index ac47ae3..263fd76 100644 --- a/merge.go +++ b/merge.go @@ -62,7 +62,7 @@ func (m *merger) Merge(b *Bitcask, lim *rate.Limiter) error { return errors.WithStack(err) } - snapshot, err := m.snapshotIndexer(b) + snapshot, err := m.snapshotIndexer(b, lim) if err != nil { return errors.WithStack(err) } @@ -137,7 +137,7 @@ func (m *merger) forwardCurrentDafafile(b *Bitcask) ([]int32, error) { return mergeFileIds, nil } -func (m *merger) snapshotIndexer(b *Bitcask) (*snapshotTrie, error) { +func (m *merger) snapshotIndexer(b *Bitcask, lim *rate.Limiter) (*snapshotTrie, error) { b.mu.RLock() defer b.mu.RUnlock() @@ -152,6 +152,15 @@ func (m *merger) snapshotIndexer(b *Bitcask) (*snapshotTrie, error) { lastErr = errors.WithStack(err) return false } + + r := lim.ReserveN(time.Now(), indexer.FilerByte) + if r.OK() != true { + return true + } + + if d := r.Delay(); 0 < d { + time.Sleep(d) + } return true }) if lastErr != nil { From 34d12011081e977d5adac64020aa20627fba3e87 Mon Sep 17 00:00:00 2001 From: Yusuke Hata Date: Thu, 30 Jun 2022 21:05:18 +0900 Subject: [PATCH 3/3] close all opened datafiles --- merge.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/merge.go b/merge.go index 263fd76..2ce5170 100644 --- a/merge.go +++ b/merge.go @@ -289,6 +289,11 @@ func (t *mergeTempDB) Merge(src *Bitcask, mergeFileIds []int32, st *snapshotTrie defer t.mdb.mu.Unlock() m := make(map[int32]datafile.Datafile, len(mergeFileIds)) + defer func() { + for _, df := range m { + df.Close() + } + }() for _, fileID := range mergeFileIds { df, err := datafile.OpenReadonly( datafile.RuntimeContext(src.opt.RuntimeContext), @@ -302,11 +307,6 @@ func (t *mergeTempDB) Merge(src *Bitcask, mergeFileIds []int32, st *snapshotTrie } m[fileID] = df } - defer func() { - for _, df := range m { - df.Close() - } - }() if err := t.mergeDatafileLocked(st, m, lim); err != nil { return errors.WithStack(err)