Skip to content

Commit

Permalink
logdb: added a test.
Browse files Browse the repository at this point in the history
  • Loading branch information
lni committed Jun 13, 2019
1 parent a23b7ba commit 93840fb
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 19 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*safe-to-delete*
drummer-mt-out.txt
out.txt
coverage.out
*.pprof
*.swp
*.tmp
Expand Down
13 changes: 0 additions & 13 deletions internal/logdb/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,13 @@
package logdb

import (
"os"
"path/filepath"
"reflect"
"testing"

"github.com/lni/dragonboat/internal/utils/leaktest"
"github.com/lni/dragonboat/raftio"
)

func getDirSize(path string) (int64, error) {
var size int64
err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
if err == nil && !info.IsDir() {
size += info.Size()
}
return nil
})
return size, err
}

func TestCompactionTaskCanBeCreated(t *testing.T) {
defer leaktest.AfterTest(t)()
p := newCompactions()
Expand Down
4 changes: 2 additions & 2 deletions internal/logdb/kv/leveldb/kv_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (r *KV) BulkRemoveEntries(fk []byte, lk []byte) error {
}

func (r *KV) deleteRange(fk []byte, lk []byte) error {
wb := levigo.NewWriteBatch()
wb := r.GetWriteBatch(nil)
it := r.db.NewIterator(r.ro)
defer it.Close()
for it.Seek(fk); it.Valid(); it.Next() {
Expand All @@ -210,7 +210,7 @@ func (r *KV) deleteRange(fk []byte, lk []byte) error {
if err := it.GetError(); err != nil {
return err
}
return r.db.Write(r.wo, wb)
return r.CommitWriteBatch(wb)
}

// CompactEntries ...
Expand Down
95 changes: 95 additions & 0 deletions internal/logdb/rdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ import (
"math"
"os"
"path/filepath"
"strings"
"sync/atomic"
"testing"
"time"

"github.com/lni/dragonboat/internal/utils/leaktest"
"github.com/lni/dragonboat/raftio"
Expand All @@ -29,6 +32,17 @@ const (
RDBTestDirectory = "rdb_test_dir_safe_to_delete"
)

func getDirSize(path string) (int64, error) {
var size int64
err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
if err == nil && !info.IsDir() {
size += info.Size()
}
return nil
})
return size, err
}

func getNewTestDB(dir string, lldir string, batched bool) raftio.ILogDB {
d := filepath.Join(RDBTestDirectory, dir)
lld := filepath.Join(RDBTestDirectory, lldir)
Expand Down Expand Up @@ -975,6 +989,87 @@ func testAllWantedEntriesAreAccessible(t *testing.T, first uint64, last uint64)
runLogDBTest(t, tf)
}

func TestRemoveEntriesTo(t *testing.T) {
defer leaktest.AfterTest(t)()
dir := "db-dir"
lldir := "wal-db-dir"
d := filepath.Join(RDBTestDirectory, dir)
lld := filepath.Join(RDBTestDirectory, lldir)
os.RemoveAll(d)
os.RemoveAll(lld)
defer os.RemoveAll(RDBTestDirectory)
clusterID := uint64(0)
nodeID := uint64(4)
ents := make([]pb.Entry, 0)
maxIndex := uint64(1024)
skipSizeCheck := false
func() {
db := getNewTestDB(dir, lldir, false)
sdb, ok := db.(*ShardedRDB)
if !ok {
t.Fatalf("failed to get sdb")
}
name := sdb.Name()
plog.Infof("name: %s", name)
skipSizeCheck = strings.Contains(name, "leveldb")
failed, err := sdb.SelfCheckFailed()
if err != nil || failed {
t.Fatalf("self check failed")
}
defer db.Close()
for i := uint64(0); i < maxIndex; i++ {
e := pb.Entry{
Term: 1,
Index: i,
Type: pb.ApplicationEntry,
Cmd: make([]byte, 1024*4),
}
ents = append(ents, e)
}
ud := pb.Update{
EntriesToSave: ents,
State: pb.State{Commit: 1},
ClusterID: clusterID,
NodeID: nodeID,
}
err = db.SaveRaftState([]pb.Update{ud}, newRDBContext(1, nil))
if err != nil {
t.Fatalf("failed to save recs")
}
if err := db.RemoveEntriesTo(clusterID, nodeID, maxIndex); err != nil {
t.Fatalf("failed to remove entries to, %v", err)
}
for i := 0; i < 1000; i++ {
if atomic.LoadUint64(&(sdb.completedCompactions)) == 0 {
time.Sleep(10 * time.Millisecond)
} else {
break
}
if i == 999 {
t.Fatalf("failed to trigger compaction")
}
}
results, _, err := db.IterateEntries(nil,
0, clusterID, nodeID, 1, 100, math.MaxUint64)
if len(results) > 0 {
t.Errorf("entries not deleted")
}
}()
// leveldb has the leftover ldb file
// https://github.com/google/leveldb/issues/573
// https://github.com/google/leveldb/issues/593
if !skipSizeCheck {
sz, err := getDirSize(RDBTestDirectory)
if err != nil {
t.Fatalf("failed to get sz %v", err)
}
plog.Infof("sz: %d", sz)
if sz > 1024*1024 {
t.Errorf("unexpected size, %d", sz)
}
}
}

func TestAllWantedEntriesAreAccessible(t *testing.T) {
testAllWantedEntriesAreAccessible(t, 1, 2)
testAllWantedEntriesAreAccessible(t, 3, batchSize/2)
Expand Down
7 changes: 3 additions & 4 deletions internal/logdb/sharded_rdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,12 @@ type ShardedRDB struct {
stopper *syncutil.Stopper
}

func checkAllShards(dirs []string,
lldirs []string, kvf kvFactory) (bool, error) {
func checkAllShards(dirs []string, lls []string, kvf kvFactory) (bool, error) {
for i := uint64(0); i < numOfRocksDBInstance; i++ {
dir := filepath.Join(dirs[i], fmt.Sprintf("logdb-%d", i))
lldir := ""
if len(lldirs) > 0 {
lldir = filepath.Join(lldirs[i], fmt.Sprintf("logdb-%d", i))
if len(lls) > 0 {
lldir = filepath.Join(lls[i], fmt.Sprintf("logdb-%d", i))
}
batched, err := hasBatchedRecord(dir, lldir, kvf)
if err != nil {
Expand Down

0 comments on commit 93840fb

Please sign in to comment.