Skip to content

Commit

Permalink
Merge pull request #14 from octu0/v1.4.1
Browse files Browse the repository at this point in the history
v1.4.1
  • Loading branch information
octu0 authored Jul 13, 2022
2 parents e6c58cb + 198a45f commit 5cc4dee
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 30 deletions.
3 changes: 3 additions & 0 deletions datafile/datafile.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ func (df *defaultDatafile) ReadAtHeader(index int64) (*Header, EOFType, error) {

h, err := d.DecodeHeader()
if err != nil {
if errors.Is(err, io.EOF) {
return nil, IsEOF, nil
}
return nil, IsEOF, errors.WithStack(err)
}

Expand Down
79 changes: 53 additions & 26 deletions merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (m *merger) Merge(b *Bitcask, lim *rate.Limiter) error {
defer temp.Destroy(lim)

// Reduce b blocking time by performing b.mu.Lock/Unlock within merger.reopen()
removeMarkedFiles, err := m.reopen(b, temp, lastFileID)
removeMarkedFiles, err := m.reopen(b, temp, lastFileID, lim)
if err != nil {
return errors.WithStack(err)
}
Expand All @@ -84,11 +84,28 @@ func (m *merger) Merge(b *Bitcask, lim *rate.Limiter) error {
return nil
}

func (m *merger) reopen(b *Bitcask, temp *mergeTempDB, lastFileID datafile.FileID) ([]string, error) {
func (m *merger) tellSaveIndexCostLocked(b *Bitcask, lim *rate.Limiter) {
saveCostFiler := b.trie.Size() * indexer.FilerByteSize
saveCostTTL := b.ttlIndex.Size() * 8

lim.ReserveN(time.Now(), saveCostFiler)
lim.ReserveN(time.Now(), saveCostTTL)
}

func (m *merger) tellLoadIndexCostLocked(b *Bitcask, lim *rate.Limiter) {
loadCostFiler := b.trie.Size() * indexer.FilerByteSize
loadCostTTL := b.ttlIndex.Size() * 8

lim.ReserveN(time.Now(), loadCostFiler)
lim.ReserveN(time.Now(), loadCostTTL)
}

func (m *merger) reopen(b *Bitcask, temp *mergeTempDB, lastFileID datafile.FileID, lim *rate.Limiter) ([]string, error) {
// no reads and writes till we reopen
b.mu.Lock()
defer b.mu.Unlock()

m.tellSaveIndexCostLocked(b, lim)
if err := b.closeLocked(); err != nil {
// try recovery
if err2 := b.reopenLocked(); err2 != nil {
Expand All @@ -104,6 +121,7 @@ func (m *merger) reopen(b *Bitcask, temp *mergeTempDB, lastFileID datafile.FileI

b.metadata.ReclaimableSpace = 0

m.tellLoadIndexCostLocked(b, lim)
// And finally reopen the database
if err := b.reopenLocked(); err != nil {
removeFileSlowly(removeMarkedFiles, nil)
Expand Down Expand Up @@ -148,18 +166,16 @@ func (m *merger) snapshotIndexer(b *Bitcask, lim *rate.Limiter) (*snapshotTrie,

var lastErr error
b.trie.ForEach(func(node art.Node) bool {
if err := st.Write(node.Key(), node.Value().(indexer.Filer)); err != nil {
lastErr = errors.WithStack(err)
return false
}

r := lim.ReserveN(time.Now(), indexer.FilerByteSize)
if r.OK() != true {
return true
if r.OK() {
if d := r.Delay(); 0 < d {
time.Sleep(d)
}
}

if d := r.Delay(); 0 < d {
time.Sleep(d)
if err := st.Write(node.Key(), node.Value().(indexer.Filer)); err != nil {
lastErr = errors.WithStack(err)
return false
}
return true
})
Expand Down Expand Up @@ -320,6 +336,13 @@ func (t *mergeTempDB) mergeDatafileLocked(st *snapshotTrie, m map[datafile.FileI
return nil
}

rr := lim.ReserveN(time.Now(), int(filer.Size))
if rr.OK() {
if d := rr.Delay(); 0 < d {
time.Sleep(d)
}
}

e, err := df.ReadAt(filer.Index, filer.Size)
if err != nil {
return errors.WithStack(err)
Expand All @@ -330,17 +353,16 @@ func (t *mergeTempDB) mergeDatafileLocked(st *snapshotTrie, m map[datafile.FileI
if isExpiredFromTime(e.Expiry) {
return nil
}
if _, _, err := t.mdb.put(e.Key, e.Value, e.Expiry); err != nil {
return errors.WithStack(err)
}

r := lim.ReserveN(time.Now(), int(e.TotalSize))
if r.OK() != true {
return nil
rw := lim.ReserveN(time.Now(), int(e.TotalSize))
if rw.OK() {
if d := rw.Delay(); 0 < d {
time.Sleep(d)
}
}

if d := r.Delay(); 0 < d {
time.Sleep(d)
if _, _, err := t.mdb.put(e.Key, e.Value, e.Expiry); err != nil {
return errors.WithStack(err)
}
return nil
})
Expand Down Expand Up @@ -506,17 +528,22 @@ func removeFileSlowly(files []string, lim *rate.Limiter) error {
}

func truncate(path string, size int64, lim *rate.Limiter) {
truncateCount := (size - 1) / defaultTruncateThreshold
threshold := int64(defaultTruncateThreshold)
if lim.Limit() < rate.Inf && lim.Limit() < math.MaxInt {
threshold = int64(lim.Limit())
}

truncateCount := (size - 1) / threshold
for i := int64(0); i < truncateCount; i += 1 {
nextSize := defaultTruncateThreshold * (truncateCount - i)
os.Truncate(path, nextSize)

r := lim.ReserveN(time.Now(), int(nextSize))
if r.OK() != true {
continue
}
if d := r.Delay(); 0 < d {
time.Sleep(d)
if r.OK() {
if d := r.Delay(); 0 < d {
time.Sleep(d)
}
}

os.Truncate(path, nextSize)
}
}
6 changes: 5 additions & 1 deletion repli.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ func (s *repliSource) LastIndex(fileID datafile.FileID) int64 {
if s.b.curr.FileID().Equal(fileID) {
return s.b.curr.Size()
}
return s.b.datafiles[fileID].Size()
df, ok := s.b.datafiles[fileID]
if ok != true {
return -1
}
return df.Size()
}

func (s *repliSource) Header(fileID datafile.FileID, index int64) (*datafile.Header, datafile.EOFType, error) {
Expand Down
24 changes: 22 additions & 2 deletions repli/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ func (e *streamEmitter) replyFetchSize(conn *nats.Conn, src Source) nats.MsgHand
})
return
}
header, isEOF, err := src.Header(req.FileID, req.Index)
header, eofType, err := src.Header(req.FileID, req.Index)
if err != nil {
e.publishReplyFetchSize(conn, msg.Reply, ResponseFetchSize{
Size: -1,
Expand All @@ -560,9 +560,19 @@ func (e *streamEmitter) replyFetchSize(conn *nats.Conn, src Source) nats.MsgHand
return
}

// file found but empty
if header == nil && eofType == datafile.IsEOF {
e.publishReplyFetchSize(conn, msg.Reply, ResponseFetchSize{
Size: 0,
EOF: bool(eofType),
Err: "",
})
return
}

e.publishReplyFetchSize(conn, msg.Reply, ResponseFetchSize{
Size: header.TotalSize,
EOF: bool(isEOF),
EOF: bool(eofType),
Err: "",
})
}
Expand Down Expand Up @@ -1088,6 +1098,11 @@ func (r *streamReciver) reqDiffData(conn *nats.Conn, dst Destination, fileID dat
return errors.WithStack(err)
}

// file found but empty
if size == 0 && isEOF {
return nil
}

entry, err := r.reqFetchData(conn, fileID, lastIndex, size)
if err != nil {
return errors.WithStack(err)
Expand Down Expand Up @@ -1116,8 +1131,13 @@ func (r *streamReciver) requestBehindData(client *nats.Conn, dst Destination, re
if err != nil {
return errors.Wrapf(err, "failed request current file index: fileID=%d", f.FileID)
}

requestedFileIds[f.FileID] = struct{}{} // mark

if lastIndex < 0 {
continue // not found
}

if f.Index < lastIndex {
// server has new index
if err := r.reqDiffData(client, dst, f.FileID, f.Index); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion version.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ package bitcaskdb

const (
AppName string = "bitcaskdb"
Version string = "1.4.0"
Version string = "1.4.1"
)

0 comments on commit 5cc4dee

Please sign in to comment.