Skip to content

Commit

Permalink
Complete partial downgraded cluster cancellation
Browse files Browse the repository at this point in the history
Signed-off-by: Chun-Hung Tseng <[email protected]>
Signed-off-by: Siyuan Zhang <[email protected]>
  • Loading branch information
Chun-Hung Tseng committed Feb 4, 2025
1 parent 3cc3daf commit a15bcbd
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 29 deletions.
93 changes: 75 additions & 18 deletions tests/e2e/cluster_downgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ package e2e
import (
"context"
"fmt"
"math/rand"
"testing"
"time"

"github.com/coreos/go-semver/semver"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/client/pkg/v3/fileutil"
Expand All @@ -43,41 +45,66 @@ const (
noCancellation CancellationState = iota
cancelRightBeforeEnable
cancelRightAfterEnable
cancelAfterDowngrading
)

func TestDowngradeUpgradeClusterOf1(t *testing.T) {
testDowngradeUpgrade(t, 1, false, noCancellation)
testDowngradeUpgrade(t, 1, 1, false, noCancellation)
}

func TestDowngradeUpgradeClusterOf3(t *testing.T) {
testDowngradeUpgrade(t, 3, false, noCancellation)
testDowngradeUpgrade(t, 3, 3, false, noCancellation)
}

func TestDowngradeUpgradeClusterOf1WithSnapshot(t *testing.T) {
testDowngradeUpgrade(t, 1, true, noCancellation)
testDowngradeUpgrade(t, 1, 1, true, noCancellation)
}

func TestDowngradeUpgradeClusterOf3WithSnapshot(t *testing.T) {
testDowngradeUpgrade(t, 3, true, noCancellation)
testDowngradeUpgrade(t, 3, 3, true, noCancellation)
}

func TestDowngradeCancellationWithoutEnablingClusterOf1(t *testing.T) {
testDowngradeUpgrade(t, 1, false, cancelRightBeforeEnable)
testDowngradeUpgrade(t, 0, 1, false, cancelRightBeforeEnable)
}

func TestDowngradeCancellationRightAfterEnablingClusterOf1(t *testing.T) {
testDowngradeUpgrade(t, 1, false, cancelRightAfterEnable)
testDowngradeUpgrade(t, 0, 1, false, cancelRightAfterEnable)
}

func TestDowngradeCancellationWithoutEnablingClusterOf3(t *testing.T) {
testDowngradeUpgrade(t, 3, false, cancelRightBeforeEnable)
testDowngradeUpgrade(t, 0, 3, false, cancelRightBeforeEnable)
}

func TestDowngradeCancellationRightAfterEnablingClusterOf3(t *testing.T) {
testDowngradeUpgrade(t, 3, false, cancelRightAfterEnable)
testDowngradeUpgrade(t, 0, 3, false, cancelRightAfterEnable)
}

func testDowngradeUpgrade(t *testing.T, clusterSize int, triggerSnapshot bool, triggerCancellation CancellationState) {
func TestDowngradeCancellationAfterDowngrading1InClusterOf3(t *testing.T) {
testDowngradeUpgrade(t, 1, 3, false, cancelAfterDowngrading)
}

func TestDowngradeCancellationAfterDowngrading2InClusterOf3(t *testing.T) {
testDowngradeUpgrade(t, 2, 3, false, cancelAfterDowngrading)
}

func TestDowngradeCancellationAfterDowngrading1InClusterOf5(t *testing.T) {
testDowngradeUpgrade(t, 1, 5, false, cancelAfterDowngrading)
}

func TestDowngradeCancellationAfterDowngrading2InClusterOf5(t *testing.T) {
testDowngradeUpgrade(t, 2, 5, false, cancelAfterDowngrading)
}

func TestDowngradeCancellationAfterDowngrading3InClusterOf5(t *testing.T) {
testDowngradeUpgrade(t, 3, 5, false, cancelAfterDowngrading)
}

func TestDowngradeCancellationAfterDowngrading4InClusterOf5(t *testing.T) {
testDowngradeUpgrade(t, 4, 5, false, cancelAfterDowngrading)
}

func testDowngradeUpgrade(t *testing.T, numberOfMembersToDowngrade int, clusterSize int, triggerSnapshot bool, triggerCancellation CancellationState) {
currentEtcdBinary := e2e.BinPath.Etcd
lastReleaseBinary := e2e.BinPath.EtcdLastRelease
if !fileutil.Exist(lastReleaseBinary) {
Expand Down Expand Up @@ -135,7 +162,7 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int, triggerSnapshot bool, t
t.Logf("Cancelling downgrade before enabling")
e2e.DowngradeCancel(t, epc)
t.Log("Downgrade cancelled, validating if cluster is in the right state")
e2e.ValidateMemberVersions(t, epc, generateIdenticalVersions(clusterSize, currentVersionStr))
e2e.ValidateMemberVersions(t, epc, generateIdenticalVersions(clusterSize, currentVersion))

return // No need to perform downgrading, end the test here
}
Expand All @@ -144,14 +171,19 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int, triggerSnapshot bool, t
t.Logf("Cancelling downgrade right after enabling (no node is downgraded yet)")
e2e.DowngradeCancel(t, epc)
t.Log("Downgrade cancelled, validating if cluster is in the right state")
e2e.ValidateMemberVersions(t, epc, generateIdenticalVersions(clusterSize, currentVersionStr))
e2e.ValidateMemberVersions(t, epc, generateIdenticalVersions(clusterSize, currentVersion))
return // No need to perform downgrading, end the test here
}

membersToChange := rand.Perm(len(epc.Procs))[:numberOfMembersToDowngrade]
t.Logf(fmt.Sprintln("Elect members for operations"), zap.Any("members", membersToChange))

t.Logf("Starting downgrade process to %q", lastVersionStr)
err = e2e.DowngradeUpgradeMembers(t, nil, epc, len(epc.Procs), currentVersion, lastClusterVersion)
err = e2e.DowngradeUpgradeMembersByID(t, nil, epc, membersToChange, currentVersion, lastClusterVersion)
require.NoError(t, err)
e2e.AssertProcessLogs(t, leader(t, epc), "the cluster has been downgraded")
if len(membersToChange) == len(epc.Procs) {
e2e.AssertProcessLogs(t, leader(t, epc), "the cluster has been downgraded")
}

t.Log("Downgrade complete")
afterMembers, afterKV := getMembersAndKeys(t, cc)
Expand All @@ -162,6 +194,13 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int, triggerSnapshot bool, t
t.Log("Waiting health interval to required to make membership changes")
time.Sleep(etcdserver.HealthInterval)
}

if triggerCancellation == cancelAfterDowngrading {
e2e.DowngradeCancel(t, epc)
t.Log("Downgrade cancelled, validating if cluster is in the right state")
e2e.ValidateMemberVersions(t, epc, generatePartialCancellationVersions(clusterSize, membersToChange, lastClusterVersion))
}

t.Log("Adding learner to test membership, but avoid breaking quorum")
resp, err = cc.MemberAddAsLearner(context.Background(), "fake2", []string{"http://127.0.0.1:1002"})
require.NoError(t, err)
Expand All @@ -176,7 +215,7 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int, triggerSnapshot bool, t
beforeMembers, beforeKV = getMembersAndKeys(t, cc)

t.Logf("Starting upgrade process to %q", currentVersionStr)
err = e2e.DowngradeUpgradeMembers(t, nil, epc, len(epc.Procs), lastClusterVersion, currentVersion)
err = e2e.DowngradeUpgradeMembersByID(t, nil, epc, membersToChange, lastClusterVersion, currentVersion)
require.NoError(t, err)
t.Log("Upgrade complete")

Expand Down Expand Up @@ -276,16 +315,34 @@ func getMembersAndKeys(t *testing.T, cc *e2e.EtcdctlV3) (*clientv3.MemberListRes
return members, kvs
}

func generateIdenticalVersions(clusterSize int, currentVersion string) []*version.Versions {
func generateIdenticalVersions(clusterSize int, ver *semver.Version) []*version.Versions {
ret := make([]*version.Versions, clusterSize)

for i := range clusterSize {
ret[i] = &version.Versions{
Cluster: currentVersion,
Server: currentVersion,
Storage: currentVersion,
Cluster: ver.String(),
Server: ver.String(),
Storage: ver.String(),
}
}

return ret
}

func generatePartialCancellationVersions(clusterSize int, membersToChange []int, ver *semver.Version) []*version.Versions {
ret := make([]*version.Versions, clusterSize)

for i := range clusterSize {
ret[i] = &version.Versions{
Cluster: ver.String(),
Server: e2e.OffsetMinor(ver, 1).String(),
Storage: "",
}
}

for i := range membersToChange {
ret[membersToChange[i]].Server = ver.String()
}

return ret
}
46 changes: 35 additions & 11 deletions tests/framework/e2e/downgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func DowngradeEnable(t *testing.T, epc *EtcdProcessCluster, ver *semver.Version)
for i := 0; i < len(epc.Procs); i++ {
ValidateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{
Cluster: ver.String(),
Server: offsetMinor(ver, 1).String(),
Server: OffsetMinor(ver, 1).String(),
Storage: ver.String(),
})
AssertProcessLogs(t, epc.Procs[i], "The server is ready to downgrade")
Expand All @@ -53,17 +53,43 @@ func DowngradeEnable(t *testing.T, epc *EtcdProcessCluster, ver *semver.Version)
}

func DowngradeCancel(t *testing.T, epc *EtcdProcessCluster) {
t.Logf("etcdctl downgrade cancel")
c := epc.Etcdctl()
testutils.ExecuteWithTimeout(t, 20*time.Second, func() {
err := c.DowngradeCancel(context.TODO())
require.NoError(t, err)

var err error
testutils.ExecuteWithTimeout(t, 2*time.Minute, func() {
for {
t.Logf("etcdctl downgrade cancel")
err = c.DowngradeCancel(context.TODO())
if err != nil {
if strings.Contains(err.Error(), "no inflight downgrade job") {
// cancellation has been performed successfully
t.Log(err)
err = nil
break
}

t.Logf("etcdctl downgrade error: %v, retrying", err)
continue
}

t.Logf("etcdctl downgrade cancel executed successfully")
break
}
})

require.NoError(t, err)

t.Log("Cluster downgrade cancellation is completed")
}

func DowngradeUpgradeMembers(t *testing.T, lg *zap.Logger, clus *EtcdProcessCluster, numberOfMembersToChange int, currentVersion, targetVersion *semver.Version) error {
membersToChange := rand.Perm(len(clus.Procs))[:numberOfMembersToChange]
t.Logf(fmt.Sprintln("Elect members for operations"), zap.Any("members", membersToChange))

return DowngradeUpgradeMembersByID(t, lg, clus, membersToChange, currentVersion, targetVersion)
}

func DowngradeUpgradeMembersByID(t *testing.T, lg *zap.Logger, clus *EtcdProcessCluster, membersToChange []int, currentVersion, targetVersion *semver.Version) error {
if lg == nil {
lg = clus.lg
}
Expand All @@ -74,8 +100,6 @@ func DowngradeUpgradeMembers(t *testing.T, lg *zap.Logger, clus *EtcdProcessClus
opString = "downgrading"
newExecPath = BinPath.EtcdLastRelease
}
membersToChange := rand.Perm(len(clus.Procs))[:numberOfMembersToChange]
lg.Info(fmt.Sprintf("Test %s members", opString), zap.Any("members", membersToChange))

for _, memberID := range membersToChange {
member := clus.Procs[memberID]
Expand All @@ -96,7 +120,7 @@ func DowngradeUpgradeMembers(t *testing.T, lg *zap.Logger, clus *EtcdProcessClus
lg.Info("Validating versions")
for _, memberID := range membersToChange {
member := clus.Procs[memberID]
if isDowngrade || numberOfMembersToChange == len(clus.Procs) {
if isDowngrade || len(membersToChange) == len(clus.Procs) {
ValidateVersion(t, clus.Cfg, member, version.Versions{
Cluster: targetVersion.String(),
Server: targetVersion.String(),
Expand All @@ -119,7 +143,7 @@ func ValidateMemberVersions(t *testing.T, epc *EtcdProcessCluster, expect []*ver
}

func ValidateVersion(t *testing.T, cfg *EtcdProcessClusterConfig, member EtcdProcess, expect version.Versions) {
testutils.ExecuteWithTimeout(t, 30*time.Second, func() {
testutils.ExecuteWithTimeout(t, 2*time.Minute, func() {
for {
result, err := getMemberVersionByCurl(cfg, member)
if err != nil {
Expand All @@ -139,8 +163,8 @@ func ValidateVersion(t *testing.T, cfg *EtcdProcessClusterConfig, member EtcdPro
})
}

// offsetMinor returns the version with offset from the original minor, with the same major.
func offsetMinor(v *semver.Version, offset int) *semver.Version {
// OffsetMinor returns the version with offset from the original minor, with the same major.
func OffsetMinor(v *semver.Version, offset int) *semver.Version {
var minor int64
if offset >= 0 {
minor = v.Minor + int64(offset)
Expand Down

0 comments on commit a15bcbd

Please sign in to comment.