Skip to content

Commit

Permalink
Make StatefulSet restart pods with phase Succeeded (#117)
Browse files Browse the repository at this point in the history
  • Loading branch information
csuzhangxc authored Mar 18, 2024
1 parent 90a78e3 commit a2789df
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 24 deletions.
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ test-client:
.PHONY: test-client

test-integration: openapi-spec
@echo "skip integration tests now, ref https://github.com/kubernetes/kubernetes/issues/119220"
# hack/make-rules/test-integration.sh $(WHAT)
hack/make-rules/test-integration.sh $(WHAT)
.PHONY: test-integration

e2e:
Expand Down
28 changes: 21 additions & 7 deletions pkg/controller/statefulset/stateful_set_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,13 +416,27 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
if replicas[i] == nil {
continue
}
// delete and recreate failed pods
if isFailed(replicas[i]) {
ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod",
"StatefulSet %s/%s is recreating failed Pod %s",
set.Namespace,
set.Name,
replicas[i].Name)
// Delete and recreate pods which finished running.
//
// Note that pods with phase Succeeded will also trigger this event. This is
// because final pod phase of evicted or otherwise forcibly stopped pods
// (e.g. terminated on node reboot) is determined by the exit code of the
// container, not by the reason for pod termination. We should restart the pod
// regardless of the exit code.
if isFailed(replicas[i]) || isSucceeded(replicas[i]) {
if isFailed(replicas[i]) {
ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod",
"StatefulSet %s/%s is recreating failed Pod %s",
set.Namespace,
set.Name,
replicas[i].Name)
} else {
ssc.recorder.Eventf(set, v1.EventTypeNormal, "RecreatingTerminatedPod",
"StatefulSet %s/%s is recreating terminated Pod %s",
set.Namespace,
set.Name,
replicas[i].Name)
}
if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil {
return &status, err
}
Expand Down
13 changes: 11 additions & 2 deletions pkg/controller/statefulset/stateful_set_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func TestStatefulSetControl(t *testing.T) {
{ScalesDown, simpleSetFn},
{ReplacesPods, largeSetFn},
{RecreatesFailedPod, simpleSetFn},
{RecreatesSucceededPod, simpleSetFn},
{CreatePodFailure, simpleSetFn},
{UpdatePodFailure, simpleSetFn},
{UpdateSetStatusFailure, simpleSetFn},
Expand Down Expand Up @@ -267,7 +268,7 @@ func ReplacesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc)
}
}

func RecreatesFailedPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
func recreatesPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc, phase v1.PodPhase) {
client := fake.NewSimpleClientset()
pcClient := pcfake.NewSimpleClientset(set)
spc, _, ssc, stop := setupController(pcClient, client)
Expand All @@ -290,7 +291,7 @@ func RecreatesFailedPod(t *testing.T, set *apps.StatefulSet, invariants invarian
if err != nil {
t.Error(err)
}
pods[0].Status.Phase = v1.PodFailed
pods[0].Status.Phase = phase
spc.podsIndexer.Update(pods[0])
if err := ssc.UpdateStatefulSet(set, pods); err != nil {
t.Errorf("Error updating StatefulSet %s", err)
Expand All @@ -307,6 +308,14 @@ func RecreatesFailedPod(t *testing.T, set *apps.StatefulSet, invariants invarian
}
}

func RecreatesFailedPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
recreatesPod(t, set, invariants, v1.PodFailed)
}

func RecreatesSucceededPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
recreatesPod(t, set, invariants, v1.PodSucceeded)
}

func CreatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
client := fake.NewSimpleClientset()
pcClient := pcfake.NewSimpleClientset(set)
Expand Down
5 changes: 5 additions & 0 deletions pkg/controller/statefulset/stateful_set_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,11 @@ func isFailed(pod *v1.Pod) bool {
return pod.Status.Phase == v1.PodFailed
}

// isSucceeded returns true if pod has a Phase of PodSucceeded
func isSucceeded(pod *v1.Pod) bool {
return pod.Status.Phase == v1.PodSucceeded
}

// isTerminating returns true if pod's DeletionTimestamp has been set
func isTerminating(pod *v1.Pod) bool {
return pod.DeletionTimestamp != nil
Expand Down
53 changes: 40 additions & 13 deletions test/integration/statefulset/statefulset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
integrationutil "github.com/pingcap/advanced-statefulset/test/integration/util"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
)

Expand Down Expand Up @@ -79,17 +80,19 @@ func TestDeletingAndFailedPods(t *testing.T) {
stopCh := runControllerAndInformers(rm, informers, pcinformers)
defer close(stopCh)

podCount := 3

labelMap := labelMap()
sts := newSTS("sts", ns.Name, 2)
sts := newSTS("sts", ns.Name, podCount)
stss, _ := createSTSsPods(t, c, pcc, []*appsv1.StatefulSet{sts}, []*v1.Pod{})
sts = stss[0]
waitSTSStable(t, pcc, sts)

// Verify STS creates 2 pods
// Verify STS creates 3 pods
podClient := c.CoreV1().Pods(ns.Name)
pods := getPods(t, podClient, labelMap)
if len(pods.Items) != 2 {
t.Fatalf("len(pods) = %d, want 2", len(pods.Items))
if len(pods.Items) != podCount {
t.Fatalf("len(pods) = %d, want %d", len(pods.Items), podCount)
}

// Set first pod as deleting pod
Expand All @@ -108,23 +111,47 @@ func TestDeletingAndFailedPods(t *testing.T) {
pod.Status.Phase = v1.PodFailed
})

// Set third pod as succeeded pod
succeededPod := &pods.Items[2]
updatePodStatus(t, podClient, succeededPod.Name, func(pod *v1.Pod) {
pod.Status.Phase = v1.PodSucceeded
})

exists := func(pods []v1.Pod, uid types.UID) bool {
for _, pod := range pods {
if pod.UID == uid {
return true
}
}
return false
}

if err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
// Verify only 2 pods exist: deleting pod and new pod replacing failed pod
// Verify only 3 pods exist: deleting pod and new pod replacing failed pod
pods = getPods(t, podClient, labelMap)
if len(pods.Items) != 2 {
if len(pods.Items) != podCount {
return false, nil
}
// Verify deleting pod still exists
// Immediately return false with an error if it does not exist
if pods.Items[0].UID != deletingPod.UID && pods.Items[1].UID != deletingPod.UID {
if !exists(pods.Items, deletingPod.UID) {
return false, fmt.Errorf("expected deleting pod %s still exists, but it is not found", deletingPod.Name)
}
// Verify failed pod does not exist anymore
if pods.Items[0].UID == failedPod.UID || pods.Items[1].UID == failedPod.UID {
if exists(pods.Items, failedPod.UID) {
return false, nil
}
// Verify both pods have non-failed status
return pods.Items[0].Status.Phase != v1.PodFailed && pods.Items[1].Status.Phase != v1.PodFailed, nil
// Verify succeeded pod does not exist anymore
if exists(pods.Items, succeededPod.UID) {
return false, nil
}
// Verify all pods have non-terminated status
for _, pod := range pods.Items {
if pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded {
return false, nil
}
}
return true, nil
}); err != nil {
t.Fatalf("failed to verify failed pod %s has been replaced with a new non-failed pod, and deleting pod %s survives: %v", failedPod.Name, deletingPod.Name, err)
}
Expand All @@ -135,13 +162,13 @@ func TestDeletingAndFailedPods(t *testing.T) {
})

if err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
// Verify only 2 pods exist: new non-deleting pod replacing deleting pod and the non-failed pod
// Verify only 3 pods exist: new non-deleting pod replacing deleting pod and the non-failed pod
pods = getPods(t, podClient, labelMap)
if len(pods.Items) != 2 {
if len(pods.Items) != podCount {
return false, nil
}
// Verify deleting pod does not exist anymore
return pods.Items[0].UID != deletingPod.UID && pods.Items[1].UID != deletingPod.UID, nil
return !exists(pods.Items, deletingPod.UID), nil
}); err != nil {
t.Fatalf("failed to verify deleting pod %s has been replaced with a new non-deleting pod: %v", deletingPod.Name, err)
}
Expand Down

0 comments on commit a2789df

Please sign in to comment.