From 9512643a69b31dcb4082c9a93bbefa00d03fb901 Mon Sep 17 00:00:00 2001 From: Yusuke Hata Date: Wed, 13 Jul 2022 21:15:53 +0900 Subject: [PATCH 1/4] v1.4.1 --- version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.go b/version.go index 828651a..27c5002 100644 --- a/version.go +++ b/version.go @@ -2,5 +2,5 @@ package bitcaskdb const ( AppName string = "bitcaskdb" - Version string = "1.4.0" + Version string = "1.4.1" ) From d916b31776a6de174e62f6e2cc221fc3620245a8 Mon Sep 17 00:00:00 2001 From: Yusuke Hata Date: Wed, 13 Jul 2022 21:17:53 +0900 Subject: [PATCH 2/4] reserved I/O size for reopen(closeIndex/saveIndex) operations when rate.Limiter was shared --- merge.go | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/merge.go b/merge.go index 7eac08d..4d519e7 100644 --- a/merge.go +++ b/merge.go @@ -75,7 +75,7 @@ func (m *merger) Merge(b *Bitcask, lim *rate.Limiter) error { defer temp.Destroy(lim) // Reduce b blocking time by performing b.mu.Lock/Unlock within merger.reopen() - removeMarkedFiles, err := m.reopen(b, temp, lastFileID) + removeMarkedFiles, err := m.reopen(b, temp, lastFileID, lim) if err != nil { return errors.WithStack(err) } @@ -84,11 +84,28 @@ func (m *merger) Merge(b *Bitcask, lim *rate.Limiter) error { return nil } -func (m *merger) reopen(b *Bitcask, temp *mergeTempDB, lastFileID datafile.FileID) ([]string, error) { +func (m *merger) tellSaveIndexCostLocked(b *Bitcask, lim *rate.Limiter) { + saveCostFiler := b.trie.Size() * indexer.FilerByteSize + saveCostTTL := b.ttlIndex.Size() * 8 + + lim.ReserveN(time.Now(), saveCostFiler) + lim.ReserveN(time.Now(), saveCostTTL) +} + +func (m *merger) tellLoadIndexCostLocked(b *Bitcask, lim *rate.Limiter) { + loadCostFiler := b.trie.Size() * indexer.FilerByteSize + loadCostTTL := b.ttlIndex.Size() * 8 + + lim.ReserveN(time.Now(), loadCostFiler) + lim.ReserveN(time.Now(), loadCostTTL) +} + +func (m *merger) reopen(b *Bitcask, temp *mergeTempDB, lastFileID datafile.FileID, lim *rate.Limiter) ([]string, error) { // no reads and writes till we reopen b.mu.Lock() defer b.mu.Unlock() + m.tellSaveIndexCostLocked(b, lim) if err := b.closeLocked(); err != nil { // try recovery if err2 := b.reopenLocked(); err2 != nil { @@ -104,6 +121,7 @@ func (m *merger) reopen(b *Bitcask, temp *mergeTempDB, lastFileID datafile.FileI b.metadata.ReclaimableSpace = 0 + m.tellLoadIndexCostLocked(b, lim) // And finally reopen the database if err := b.reopenLocked(); err != nil { removeFileSlowly(removeMarkedFiles, nil) From 42e5ddef5d0cb0543f6247638195a128c4e45fa2 Mon Sep 17 00:00:00 2001 From: Yusuke Hata Date: Wed, 13 Jul 2022 21:18:40 +0900 Subject: [PATCH 3/4] reserve before I/O operation --- merge.go | 57 ++++++++++++++++++++++++++++++++------------------------ 1 file changed, 33 insertions(+), 24 deletions(-) diff --git a/merge.go b/merge.go index 4d519e7..86f5c1f 100644 --- a/merge.go +++ b/merge.go @@ -166,18 +166,16 @@ func (m *merger) snapshotIndexer(b *Bitcask, lim *rate.Limiter) (*snapshotTrie, var lastErr error b.trie.ForEach(func(node art.Node) bool { - if err := st.Write(node.Key(), node.Value().(indexer.Filer)); err != nil { - lastErr = errors.WithStack(err) - return false - } - r := lim.ReserveN(time.Now(), indexer.FilerByteSize) - if r.OK() != true { - return true + if r.OK() { + if d := r.Delay(); 0 < d { + time.Sleep(d) + } } - if d := r.Delay(); 0 < d { - time.Sleep(d) + if err := st.Write(node.Key(), node.Value().(indexer.Filer)); err != nil { + lastErr = errors.WithStack(err) + return false } return true }) @@ -338,6 +336,13 @@ func (t *mergeTempDB) mergeDatafileLocked(st *snapshotTrie, m map[datafile.FileI return nil } + rr := lim.ReserveN(time.Now(), int(filer.Size)) + if rr.OK() { + if d := rr.Delay(); 0 < d { + time.Sleep(d) + } + } + e, err := df.ReadAt(filer.Index, filer.Size) if err != nil { return errors.WithStack(err) @@ -348,17 +353,16 @@ func (t *mergeTempDB) mergeDatafileLocked(st *snapshotTrie, m map[datafile.FileI if isExpiredFromTime(e.Expiry) { return nil } - if _, _, err := t.mdb.put(e.Key, e.Value, e.Expiry); err != nil { - return errors.WithStack(err) - } - r := lim.ReserveN(time.Now(), int(e.TotalSize)) - if r.OK() != true { - return nil + rw := lim.ReserveN(time.Now(), int(e.TotalSize)) + if rw.OK() { + if d := rw.Delay(); 0 < d { + time.Sleep(d) + } } - if d := r.Delay(); 0 < d { - time.Sleep(d) + if _, _, err := t.mdb.put(e.Key, e.Value, e.Expiry); err != nil { + return errors.WithStack(err) } return nil }) @@ -524,17 +528,22 @@ func removeFileSlowly(files []string, lim *rate.Limiter) error { } func truncate(path string, size int64, lim *rate.Limiter) { - truncateCount := (size - 1) / defaultTruncateThreshold + threshold := int64(defaultTruncateThreshold) + if lim.Limit() < rate.Inf && lim.Limit() < math.MaxInt { + threshold = int64(lim.Limit()) + } + + truncateCount := (size - 1) / threshold for i := int64(0); i < truncateCount; i += 1 { nextSize := defaultTruncateThreshold * (truncateCount - i) - os.Truncate(path, nextSize) r := lim.ReserveN(time.Now(), int(nextSize)) - if r.OK() != true { - continue - } - if d := r.Delay(); 0 < d { - time.Sleep(d) + if r.OK() { + if d := r.Delay(); 0 < d { + time.Sleep(d) + } } + + os.Truncate(path, nextSize) } } From 198a45f88bab90976e3e48b52c8f7d3fc0677ba9 Mon Sep 17 00:00:00 2001 From: Yusuke Hata Date: Wed, 13 Jul 2022 21:51:44 +0900 Subject: [PATCH 4/4] empty database replication --- datafile/datafile.go | 3 +++ repli.go | 6 +++++- repli/stream.go | 24 ++++++++++++++++++++++-- 3 files changed, 30 insertions(+), 3 deletions(-) diff --git a/datafile/datafile.go b/datafile/datafile.go index 91d508b..82055e8 100644 --- a/datafile/datafile.go +++ b/datafile/datafile.go @@ -137,6 +137,9 @@ func (df *defaultDatafile) ReadAtHeader(index int64) (*Header, EOFType, error) { h, err := d.DecodeHeader() if err != nil { + if errors.Is(err, io.EOF) { + return nil, IsEOF, nil + } return nil, IsEOF, errors.WithStack(err) } diff --git a/repli.go b/repli.go index a1ea1d0..dc96906 100644 --- a/repli.go +++ b/repli.go @@ -46,7 +46,11 @@ func (s *repliSource) LastIndex(fileID datafile.FileID) int64 { if s.b.curr.FileID().Equal(fileID) { return s.b.curr.Size() } - return s.b.datafiles[fileID].Size() + df, ok := s.b.datafiles[fileID] + if ok != true { + return -1 + } + return df.Size() } func (s *repliSource) Header(fileID datafile.FileID, index int64) (*datafile.Header, datafile.EOFType, error) { diff --git a/repli/stream.go b/repli/stream.go index 787c21d..52b2767 100644 --- a/repli/stream.go +++ b/repli/stream.go @@ -550,7 +550,7 @@ func (e *streamEmitter) replyFetchSize(conn *nats.Conn, src Source) nats.MsgHand }) return } - header, isEOF, err := src.Header(req.FileID, req.Index) + header, eofType, err := src.Header(req.FileID, req.Index) if err != nil { e.publishReplyFetchSize(conn, msg.Reply, ResponseFetchSize{ Size: -1, @@ -560,9 +560,19 @@ func (e *streamEmitter) replyFetchSize(conn *nats.Conn, src Source) nats.MsgHand return } + // file found but empty + if header == nil && eofType == datafile.IsEOF { + e.publishReplyFetchSize(conn, msg.Reply, ResponseFetchSize{ + Size: 0, + EOF: bool(eofType), + Err: "", + }) + return + } + e.publishReplyFetchSize(conn, msg.Reply, ResponseFetchSize{ Size: header.TotalSize, - EOF: bool(isEOF), + EOF: bool(eofType), Err: "", }) } @@ -1088,6 +1098,11 @@ func (r *streamReciver) reqDiffData(conn *nats.Conn, dst Destination, fileID dat return errors.WithStack(err) } + // file found but empty + if size == 0 && isEOF { + return nil + } + entry, err := r.reqFetchData(conn, fileID, lastIndex, size) if err != nil { return errors.WithStack(err) @@ -1116,8 +1131,13 @@ func (r *streamReciver) requestBehindData(client *nats.Conn, dst Destination, re if err != nil { return errors.Wrapf(err, "failed request current file index: fileID=%d", f.FileID) } + requestedFileIds[f.FileID] = struct{}{} // mark + if lastIndex < 0 { + continue // not found + } + if f.Index < lastIndex { // server has new index if err := r.reqDiffData(client, dst, f.FileID, f.Index); err != nil {