Skip to content

Commit

Permalink
rsm,tools: fixed #77, #78, #79
Browse files Browse the repository at this point in the history
  • Loading branch information
lni committed Jun 30, 2019
1 parent 5c22480 commit c945600
Show file tree
Hide file tree
Showing 19 changed files with 1,187 additions and 647 deletions.
3 changes: 1 addition & 2 deletions internal/cpp/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,8 +347,7 @@ func (ds *StateMachineWrapper) OnDiskStateMachine() bool {

// RecoverFromSnapshot recovers the state of the data store from the snapshot
// file specified by the fp input string.
func (ds *StateMachineWrapper) RecoverFromSnapshot(index uint64,
reader *rsm.SnapshotReader,
func (ds *StateMachineWrapper) RecoverFromSnapshot(reader *rsm.SnapshotReader,
files []sm.SnapshotFile) error {
ds.ensureNotDestroyed()
cf := C.GetCollectedFile()
Expand Down
2 changes: 1 addition & 1 deletion internal/cpp/wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func TestCppSnapshotWorks(t *testing.T) {
t.Fatalf("%v", err)
}
reader.ValidateHeader(header)
err = ds2.RecoverFromSnapshot(0, reader, nil)
err = ds2.RecoverFromSnapshot(reader, nil)
if err != nil {
t.Errorf("failed to recover from snapshot %v", err)
}
Expand Down
1 change: 1 addition & 0 deletions internal/rsm/chunkwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ func (cw *ChunkWriter) getChunk() pb.SnapshotChunk {
FileChunkId: cw.chunkID,
Index: cw.meta.Index,
Term: cw.meta.Term,
OnDiskIndex: cw.meta.OnDiskIndex,
Membership: cw.meta.Membership,
BinVer: raftio.RPCBinVersion,
Filepath: server.GetSnapshotFilename(cw.meta.Index),
Expand Down
10 changes: 5 additions & 5 deletions internal/rsm/native.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type ISavable interface {
// ILoadableSM is the interface for types that can have its state restored from
// snapshots.
type ILoadableSM interface {
RecoverFromSnapshot(uint64, *SnapshotReader, []sm.SnapshotFile) error
RecoverFromSnapshot(*SnapshotReader, []sm.SnapshotFile) error
}

// ILoadableSessions is the interface for types that can load client session
Expand All @@ -63,7 +63,7 @@ type IManagedStateMachine interface {
PrepareSnapshot() (interface{}, error)
SaveSnapshot(*SnapshotMeta,
*SnapshotWriter, []byte, sm.ISnapshotFileCollection) (bool, uint64, error)
RecoverFromSnapshot(uint64, *SnapshotReader, []sm.SnapshotFile) error
RecoverFromSnapshot(*SnapshotReader, []sm.SnapshotFile) error
StreamSnapshot(interface{}, *ChunkWriter) error
Offloaded(From)
Loaded(From)
Expand Down Expand Up @@ -293,7 +293,7 @@ func (ds *NativeStateMachine) StreamSnapshot(ssctx interface{},

// RecoverFromSnapshot recovers the state of the data store from the snapshot
// file specified by the fp input string.
func (ds *NativeStateMachine) RecoverFromSnapshot(index uint64,
reader *SnapshotReader, files []sm.SnapshotFile) error {
return ds.sm.RecoverFromSnapshot(index, reader, files, ds.done)
func (ds *NativeStateMachine) RecoverFromSnapshot(reader *SnapshotReader,
files []sm.SnapshotFile) error {
return ds.sm.RecoverFromSnapshot(reader, files, ds.done)
}
54 changes: 23 additions & 31 deletions internal/rsm/sm.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type IStateMachine interface {
PrepareSnapshot() (interface{}, error)
SaveSnapshot(interface{},
io.Writer, sm.ISnapshotFileCollection, <-chan struct{}) error
RecoverFromSnapshot(uint64, io.Reader, []sm.SnapshotFile, <-chan struct{}) error
RecoverFromSnapshot(io.Reader, []sm.SnapshotFile, <-chan struct{}) error
Close() error
GetHash() (uint64, error)
ConcurrentSnapshot() bool
Expand Down Expand Up @@ -110,8 +110,8 @@ func (s *RegularStateMachine) SaveSnapshot(ctx interface{},
}

// RecoverFromSnapshot recovers the state machine from a snapshot.
func (s *RegularStateMachine) RecoverFromSnapshot(index uint64,
r io.Reader, fs []sm.SnapshotFile, stopc <-chan struct{}) error {
func (s *RegularStateMachine) RecoverFromSnapshot(r io.Reader,
fs []sm.SnapshotFile, stopc <-chan struct{}) error {
return s.sm.RecoverFromSnapshot(r, fs, stopc)
}

Expand Down Expand Up @@ -208,8 +208,8 @@ func (s *ConcurrentStateMachine) SaveSnapshot(ctx interface{},
}

// RecoverFromSnapshot recovers the state machine from a snapshot.
func (s *ConcurrentStateMachine) RecoverFromSnapshot(index uint64,
r io.Reader, fs []sm.SnapshotFile, stopc <-chan struct{}) error {
func (s *ConcurrentStateMachine) RecoverFromSnapshot(r io.Reader,
fs []sm.SnapshotFile, stopc <-chan struct{}) error {
return s.sm.RecoverFromSnapshot(r, fs, stopc)
}

Expand Down Expand Up @@ -246,12 +246,10 @@ func (s *ConcurrentStateMachine) StateMachineType() pb.StateMachineType {

// OnDiskStateMachine is the type to represent an on disk state machine.
type OnDiskStateMachine struct {
sm sm.IOnDiskStateMachine
h sm.IHash
na sm.IExtended
opened bool
initialIndex uint64
applied uint64
sm sm.IOnDiskStateMachine
h sm.IHash
na sm.IExtended
opened bool
}

// NewOnDiskStateMachine creates and returns an on disk state machine.
Expand All @@ -278,8 +276,6 @@ func (s *OnDiskStateMachine) Open(stopc <-chan struct{}) (uint64, error) {
if err != nil {
return 0, err
}
s.initialIndex = applied
s.applied = applied
return applied, nil
}

Expand All @@ -288,17 +284,6 @@ func (s *OnDiskStateMachine) Update(entries []sm.Entry) ([]sm.Entry, error) {
if !s.opened {
panic("Update called before Open")
}
if len(entries) > 0 {
if entries[len(entries)-1].Index <= s.initialIndex {
plog.Panicf("last entry index to apply %d, initial index %d",
entries[len(entries)-1].Index, s.initialIndex)
}
if entries[len(entries)-1].Index <= s.applied {
plog.Panicf("last entry index to apply %d, applied %d",
entries[len(entries)-1].Index, s.applied)
}
s.applied = entries[len(entries)-1].Index
}
return s.sm.Update(entries)
}

Expand Down Expand Up @@ -345,16 +330,23 @@ func (s *OnDiskStateMachine) SaveSnapshot(ctx interface{},
}

// RecoverFromSnapshot recovers the state machine from a snapshot.
func (s *OnDiskStateMachine) RecoverFromSnapshot(index uint64,
r io.Reader, fs []sm.SnapshotFile, stopc <-chan struct{}) error {
func (s *OnDiskStateMachine) RecoverFromSnapshot(r io.Reader,
fs []sm.SnapshotFile, stopc <-chan struct{}) error {
if !s.opened {
panic("RecoverFromSnapshot called when not opened")
}
if index <= s.applied {
plog.Panicf("recover snapshot moving applied index backwards, %d, %d",
index, s.applied)
}
s.applied = index
/*
rollback := ss.Imported && init
if !rollback {
if ss.StateMachineIndex <= s.index {
plog.Panicf("recover snapshot moving applied index backwards, %d, %d",
ss.StateMachineIndex, s.index)
}
} else {
s.initialIndex = ss.StateMachineIndex
}
s.index = ss.StateMachineIndex
*/
return s.sm.RecoverFromSnapshot(r, stopc)
}

Expand Down
132 changes: 1 addition & 131 deletions internal/rsm/sm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"testing"

"github.com/lni/dragonboat/v3/internal/tests"
sm "github.com/lni/dragonboat/v3/statemachine"
)

func TestOnDiskSMCanBeOpened(t *testing.T) {
Expand All @@ -33,12 +32,6 @@ func TestOnDiskSMCanBeOpened(t *testing.T) {
if idx != applied {
t.Errorf("unexpected idx %d", idx)
}
if od.initialIndex != applied {
t.Errorf("initial index not recorded %d, want %d", od.initialIndex, applied)
}
if od.applied != applied {
t.Errorf("applied not recorded %d, want %d", od.applied, applied)
}
}

func TestOnDiskSMCanNotBeOpenedMoreThanOnce(t *testing.T) {
Expand All @@ -62,108 +55,6 @@ func TestOnDiskSMCanNotBeOpenedMoreThanOnce(t *testing.T) {
}
}

func TestOnDiskSMRecordAppliedIndex(t *testing.T) {
applied := uint64(123)
fd := tests.NewFakeDiskSM(applied)
od := NewOnDiskStateMachine(fd)
idx, err := od.Open(nil)
if err != nil {
t.Fatalf("failed to open %v", err)
}
if idx != applied {
t.Errorf("unexpected idx %d", idx)
}
if od.applied != applied {
t.Errorf("applied not recorded %d, want %d", od.applied, applied)
}
entries := []sm.Entry{
{Index: applied + 1},
{Index: applied + 2},
{Index: applied + 3},
}
if _, err := od.Update(entries); err != nil {
t.Fatalf("update failed %v", err)
}
if od.applied != applied+uint64(len(entries)) {
t.Errorf("applied value not recorded")
}
}

func TestUpdateAnUnopenedOnDiskSMWillPanic(t *testing.T) {
applied := uint64(123)
fd := tests.NewFakeDiskSM(applied)
od := NewOnDiskStateMachine(fd)
entries := []sm.Entry{
{Index: applied + 1},
{Index: applied + 2},
{Index: applied + 3},
}
defer func() {
if r := recover(); r == nil {
t.Errorf("no panic")
}
}()
if _, err := od.Update(entries); err != nil {
t.Fatalf("update failed %v", err)
}
}

func TestUpdateOnDiskSMWithAppliedIndexWillPanic(t *testing.T) {
applied := uint64(123)
fd := tests.NewFakeDiskSM(applied)
od := NewOnDiskStateMachine(fd)
_, err := od.Open(nil)
if err != nil {
t.Fatalf("failed to open %v", err)
}
entries := []sm.Entry{{Index: applied + 1}}
if _, err := od.Update(entries); err != nil {
t.Fatalf("update failed %v", err)
}
defer func() {
if r := recover(); r == nil {
t.Errorf("no panic")
}
}()
if _, err := od.Update(entries); err != nil {
t.Fatalf("update failed %v", err)
}
}

func TestUpdateOnDiskSMWithIndexLessThanInitialIndexWillPanic(t *testing.T) {
applied := uint64(123)
fd := tests.NewFakeDiskSM(applied)
od := NewOnDiskStateMachine(fd)
_, err := od.Open(nil)
if err != nil {
t.Fatalf("failed to open %v", err)
}
od.applied = od.applied + 1
entries := []sm.Entry{{Index: applied}}
defer func() {
if r := recover(); r == nil {
t.Errorf("no panic")
}
}()
if _, err := od.Update(entries); err != nil {
t.Fatalf("update failed %v", err)
}
}

func TestLookupCalledBeforeOnDiskSMIsOpenedWillPanic(t *testing.T) {
applied := uint64(123)
fd := tests.NewFakeDiskSM(applied)
od := NewOnDiskStateMachine(fd)
defer func() {
if r := recover(); r == nil {
t.Errorf("no panic")
}
}()
if _, err := od.Lookup(nil); err != nil {
t.Fatalf("lookup failed %v", err)
}
}

func TestLookupCanBeCalledOnceOnDiskSMIsOpened(t *testing.T) {
applied := uint64(123)
fd := tests.NewFakeDiskSM(applied)
Expand All @@ -189,28 +80,7 @@ func TestRecoverFromSnapshotCanComplete(t *testing.T) {
buf := make([]byte, 16)
reader := bytes.NewBuffer(buf)
stopc := make(chan struct{})
if err := od.RecoverFromSnapshot(applied+1, reader, nil, stopc); err != nil {
if err := od.RecoverFromSnapshot(reader, nil, stopc); err != nil {
t.Errorf("recover from snapshot failed %v", err)
}
}

func TestRecoverFromSnapshotWillPanicWhenIndexIsLessThanApplied(t *testing.T) {
applied := uint64(123)
fd := tests.NewFakeDiskSM(applied)
od := NewOnDiskStateMachine(fd)
_, err := od.Open(nil)
if err != nil {
t.Fatalf("failed to open %v", err)
}
buf := make([]byte, 16)
reader := bytes.NewBuffer(buf)
stopc := make(chan struct{})
defer func() {
if r := recover(); r == nil {
t.Errorf("no panic")
}
}()
if err := od.RecoverFromSnapshot(applied-1, reader, nil, stopc); err != nil {
t.Fatalf("recover failed %v", err)
}
}
Loading

0 comments on commit c945600

Please sign in to comment.