Skip to content

Commit

Permalink
tools: added a tool to check whether it is safe to upgrade to v3.0.3
Browse files Browse the repository at this point in the history
  • Loading branch information
lni committed Jul 4, 2019
1 parent 4c64732 commit d260629
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 19 deletions.
15 changes: 13 additions & 2 deletions internal/logdb/rdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ func (r *rdb) importSnapshot(ss pb.Snapshot, nodeID uint64) error {
r.recordSnapshot(wb, pb.Update{
ClusterID: ss.ClusterId, NodeID: nodeID, Snapshot: ss,
})
r.recordMaxIndex(wb, ss.ClusterId, nodeID, ss.Index, nil)
return r.kvs.CommitWriteBatch(wb)
}

Expand Down Expand Up @@ -263,10 +264,20 @@ func (r *rdb) recordSnapshot(wb kv.IWriteBatch, ud pb.Update) {

func (r *rdb) recordMaxIndex(wb kv.IWriteBatch,
clusterID uint64, nodeID uint64, index uint64, ctx raftio.IContext) {
data := ctx.GetValueBuffer(8)
var data []byte
var k raftio.IReusableKey
if ctx != nil {
data = ctx.GetValueBuffer(8)
} else {
data = make([]byte, 8)
}
binary.BigEndian.PutUint64(data, index)
data = data[:8]
k := ctx.GetKey()
if ctx != nil {
k = ctx.GetKey()
} else {
k = newKey(maxKeySize, nil)
}
k.SetMaxIndexKey(clusterID, nodeID)
wb.Put(k.Key(), data)
}
Expand Down
22 changes: 22 additions & 0 deletions internal/logdb/rdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1405,6 +1405,28 @@ func TestImportSnapshot(t *testing.T) {
if state.State.Commit != snapshots[0].Index {
t.Errorf("unexpected commit value")
}
rdb := db.(*ShardedRDB).shards[2]
rdb.cs.maxIndex = make(map[raftio.NodeInfo]uint64)
maxIndex, err := rdb.readMaxIndex(clusterID, nodeID)
if err != nil {
t.Errorf("failed to get max index")
}
if maxIndex != ssimport.Index {
t.Errorf("unexpected max index value %d", maxIndex)
}
state, err = db.ReadRaftState(clusterID, nodeID, snapshots[0].Index)
if err != nil {
t.Fatalf("raft state not deleted %v", err)
}
if state == nil {
t.Fatalf("failed to get raft state")
}
if state.FirstIndex != snapshots[0].Index {
t.Errorf("first index: %d, ss index %d", state.FirstIndex, snapshots[0].Index)
}
if state.EntryCount != 0 {
t.Errorf("unexpected entry count %d", state.EntryCount)
}
}
runLogDBTest(t, tf)
}
95 changes: 78 additions & 17 deletions nodehost_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
pb "github.com/lni/dragonboat/v3/raftpb"
sm "github.com/lni/dragonboat/v3/statemachine"
"github.com/lni/dragonboat/v3/tools"
"github.com/lni/dragonboat/v3/tools/upgrade310"
)

var ovs = logdb.RDBContextValueSize
Expand Down Expand Up @@ -2282,12 +2283,42 @@ func TestSnapshotCanBeExported(t *testing.T) {
if !exist {
t.Errorf("snapshot metadata not saved")
}
var ss pb.Snapshot
if err := fileutil.GetFlagFileContent(filepath.Join(sspath, snapshotDir),
"snapshot.metadata", &ss); err != nil {
t.Fatalf("failed to get snapshot from its metadata file")
}
if ss.OnDiskIndex != 0 {
t.Errorf("on disk index is not 0")
}
if ss.Imported {
t.Errorf("incorrectly recorded as imported")
}
if ss.Type != pb.RegularStateMachine {
t.Errorf("incorrect type")
}
}
singleNodeHostTest(t, tf)
}

func TestOnDiskStateMachineCanExportSnapshot(t *testing.T) {
tf := func(t *testing.T, nh *NodeHost, initialApplied uint64) {
session := nh.GetNoOPSession(1)
proposed := false
for i := 0; i < 16; i++ {
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
_, err := nh.SyncPropose(ctx, session, []byte("test-data"))
cancel()
if err == nil {
proposed = true
break
} else {
time.Sleep(100 * time.Millisecond)
}
}
if !proposed {
t.Fatalf("failed to make proposal")
}
sspath := "exported_snapshot_safe_to_delete"
os.RemoveAll(sspath)
if err := os.MkdirAll(sspath, 0755); err != nil {
Expand Down Expand Up @@ -2342,6 +2373,20 @@ func TestOnDiskStateMachineCanExportSnapshot(t *testing.T) {
if shrunk {
t.Errorf("exported snapshot is considered as shrunk")
}
var ss pb.Snapshot
if err := fileutil.GetFlagFileContent(filepath.Join(sspath, snapshotDir),
"snapshot.metadata", &ss); err != nil {
t.Fatalf("failed to get snapshot from its metadata file")
}
if ss.OnDiskIndex == 0 {
t.Errorf("on disk index is not recorded")
}
if ss.Imported {
t.Errorf("incorrectly recorded as imported")
}
if ss.Type != pb.OnDiskStateMachine {
t.Errorf("incorrect type")
}
}
singleFakeDiskNodeHostTest(t, tf, 0)
}
Expand Down Expand Up @@ -2443,29 +2488,45 @@ func testImportedSnapshotIsAlwaysRestored(t *testing.T, newDir bool) {
if err := tools.ImportSnapshot(nhc, dir, members, 1); err != nil {
t.Fatalf("failed to import snapshot %v", err)
}
rnh, err := NewNodeHost(nhc)
ok, err := upgrade310.CanUpgradeToV310(nhc)
if err != nil {
t.Fatalf("failed to create node host %v", err)
t.Errorf("failed to check whether upgrade is possible")
}
defer rnh.Stop()
rnewSM := func(uint64, uint64) sm.IOnDiskStateMachine {
return tests.NewSimDiskSM(applied)
}
if err := rnh.StartOnDiskCluster(nil, false, rnewSM, rc); err != nil {
t.Fatalf("failed to start cluster %v", err)
if ok {
t.Errorf("should not be considered as ok to upgrade")
}
waitForLeaderToBeElected(t, rnh, 1)
ctx, cancel = context.WithTimeout(context.Background(), 200*time.Millisecond)
rv, err = rnh.SyncRead(ctx, 1, nil)
cancel()
func() {
rnh, err := NewNodeHost(nhc)
if err != nil {
t.Fatalf("failed to create node host %v", err)
}
defer rnh.Stop()
rnewSM := func(uint64, uint64) sm.IOnDiskStateMachine {
return tests.NewSimDiskSM(applied)
}
if err := rnh.StartOnDiskCluster(nil, false, rnewSM, rc); err != nil {
t.Fatalf("failed to start cluster %v", err)
}
waitForLeaderToBeElected(t, rnh, 1)
ctx, cancel = context.WithTimeout(context.Background(), 200*time.Millisecond)
rv, err = rnh.SyncRead(ctx, 1, nil)
cancel()
if err != nil {
t.Fatalf("failed to read applied value %v", err)
}
if index != rv.(uint64) {
t.Fatalf("invalid returned value %d", rv.(uint64))
}
plog.Infof("checking proposes")
makeProposals(rnh)
}()
ok, err = upgrade310.CanUpgradeToV310(nhc)
if err != nil {
t.Fatalf("failed to read applied value %v", err)
t.Errorf("failed to check whether upgrade is possible")
}
if index != rv.(uint64) {
t.Fatalf("invalid returned value %d", rv.(uint64))
if !ok {
t.Errorf("can not upgrade")
}
plog.Infof("checking proposes")
makeProposals(rnh)
}
runNodeHostTest(t, tf)
}
Expand Down
85 changes: 85 additions & 0 deletions tools/upgrade310/upgrade.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright 2017-2019 Lei Ni ([email protected]) and other Dragonboat authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package upgrade310

import (
"math"

"github.com/lni/dragonboat/v3/config"
"github.com/lni/dragonboat/v3/internal/logdb"
"github.com/lni/dragonboat/v3/internal/rsm"
"github.com/lni/dragonboat/v3/internal/server"
pb "github.com/lni/dragonboat/v3/raftpb"
)

// CanUpgradeToV310 determines whether your production dataset is safe to use
// the v3.0.3 or higher version of Dragonboat. You need to stop your NodeHost
// before invoking CanUpgradeToV310.
//
// CanUpgradeToV310 checks whether there is any snapshot that has already been
// streamed or imported but has not been fully applied into user state machine
// yet.
//
// The input parameter nhConfig should be the same NodeHostConfig instance you
// use to initiate your NodeHost object. CanUpgradeToV310 returns a boolean flag
// indicating whether it is safe to upgrade. If it returns false, you can
// restart your NodeHost using the existing version of Dragonboat, e.g. v3.0.2,
// to allow pending snapshots to be fully applied. Repeat the above steps until
// CanUpgradeToV310 returns true.
//
// Note that for the vast majority cases, CanUpgradeToV310 is expected to
// return true after its first run, which means it is safe to go ahead and
// upgrade the Dragonboat version.
func CanUpgradeToV310(nhConfig config.NodeHostConfig) (bool, error) {
if nhConfig.DeploymentID == 0 {
nhConfig.DeploymentID = 1
}
serverCtx, err := server.NewContext(nhConfig)
if err != nil {
return false, err
}
defer serverCtx.Stop()
if err := serverCtx.LockNodeHostDir(); err != nil {
return false, err
}
nhDir, walDir := serverCtx.GetLogDBDirs(nhConfig.DeploymentID)
logdb, err := logdb.NewDefaultLogDB(nhDir, walDir)
if err != nil {
return false, err
}
defer logdb.Close()
niList, err := logdb.ListNodeInfo()
if err != nil {
return false, err
}
for _, ni := range niList {
ssList, err := logdb.ListSnapshots(ni.ClusterID, ni.NodeID, math.MaxUint64)
if err != nil {
return false, err
}
for _, ss := range ssList {
if ss.Type == pb.OnDiskStateMachine && ss.OnDiskIndex == 0 {
shrunk, err := rsm.IsShrinkedSnapshotFile(ss.Filepath)
if err != nil {
return false, err
}
if !shrunk {
return false, nil
}
}
}
}
return true, nil
}

0 comments on commit d260629

Please sign in to comment.