From 825515c54135543762f1c4aa1cb152d2514d8684 Mon Sep 17 00:00:00 2001 From: wangyelei Date: Wed, 19 Jun 2024 14:06:50 +0800 Subject: [PATCH] fix: h-scale ops failed but pod is created successfully (#7564) --- .../apps/operations/horizontal_scaling.go | 4 +- .../apps/operations/ops_comp_helper.go | 55 +++++++++++-------- .../apps/operations/ops_progress_util.go | 22 +------- controllers/apps/operations/ops_util.go | 11 ---- controllers/apps/operations/ops_util_test.go | 42 ++++++++++---- pkg/testutil/k8s/pod_util.go | 23 ++++++++ 6 files changed, 88 insertions(+), 69 deletions(-) diff --git a/controllers/apps/operations/horizontal_scaling.go b/controllers/apps/operations/horizontal_scaling.go index 7d2927c29d3..5a213535f07 100644 --- a/controllers/apps/operations/horizontal_scaling.go +++ b/controllers/apps/operations/horizontal_scaling.go @@ -140,9 +140,7 @@ func (hs horizontalScalingOpsHandler) ReconcileAction(reqCtx intctrlutil.Request lastCompConfiguration := opsRes.OpsRequest.Status.LastConfiguration.Components[pgRes.compOps.GetComponentName()] horizontalScaling := pgRes.compOps.(appsv1alpha1.HorizontalScaling) pgRes.createdPodSet, pgRes.deletedPodSet = hs.getCreateAndDeletePodSet(opsRes, lastCompConfiguration, *pgRes.clusterComponent, horizontalScaling, pgRes.fullComponentName) - if horizontalScaling.Replicas == nil { - pgRes.noWaitComponentCompleted = true - } + pgRes.noWaitComponentCompleted = true return handleComponentProgressForScalingReplicas(reqCtx, cli, opsRes, pgRes, compStatus) } compOpsHelper := newComponentOpsHelper(opsRes.OpsRequest.Spec.HorizontalScalingList) diff --git a/controllers/apps/operations/ops_comp_helper.go b/controllers/apps/operations/ops_comp_helper.go index 94ec5e917ec..fc223e74aee 100644 --- a/controllers/apps/operations/ops_comp_helper.go +++ b/controllers/apps/operations/ops_comp_helper.go @@ -26,11 +26,11 @@ import ( "slices" "time" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1" "github.com/apecloud/kubeblocks/pkg/constant" + "github.com/apecloud/kubeblocks/pkg/controller/component" intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil" ) @@ -134,6 +134,15 @@ func (c componentOpsHelper) cancelComponentOps(ctx context.Context, return cli.Update(ctx, opsRes.Cluster) } +func (c componentOpsHelper) existFailure(ops *appsv1alpha1.OpsRequest, componentName string) bool { + for _, v := range ops.Status.Components[componentName].ProgressDetails { + if v.Status == appsv1alpha1.FailedProgressStatus { + return true + } + } + return false +} + // reconcileActionWithComponentOps will be performed when action is done and loops till OpsRequest.status.phase is Succeed/Failed. // the common function to reconcile opsRequest status when the opsRequest will affect the lifecycle of the components. func (c componentOpsHelper) reconcileActionWithComponentOps(reqCtx intctrlutil.RequestCtx, @@ -148,7 +157,6 @@ func (c componentOpsHelper) reconcileActionWithComponentOps(reqCtx intctrlutil.R var ( opsRequestPhase = appsv1alpha1.OpsRunningPhase opsRequest = opsRes.OpsRequest - isFailed bool expectProgressCount int32 completedProgressCount int32 requeueTimeAfterFailed time.Duration @@ -225,7 +233,8 @@ func (c componentOpsHelper) reconcileActionWithComponentOps(reqCtx intctrlutil.R } } } - var waitComponentCompleted bool + opsIsCompleted := true + existFailure := false for i := range progressResources { pgResource := progressResources[i] opsCompStatus := opsRequest.Status.Components[pgResource.compOps.GetComponentName()] @@ -235,29 +244,29 @@ func (c componentOpsHelper) reconcileActionWithComponentOps(reqCtx intctrlutil.R } expectProgressCount += expectCount completedProgressCount += completedCount + if c.existFailure(opsRes.OpsRequest, pgResource.compOps.GetComponentName()) { + existFailure = true + } + componentPhase := opsRes.Cluster.Status.Components[pgResource.compOps.GetComponentName()].Phase if !pgResource.isShardingComponent { - lastFailedTime := opsCompStatus.LastFailedTime - componentPhase := opsRes.Cluster.Status.Components[pgResource.compOps.GetComponentName()].Phase - if isFailedOrAbnormal(componentPhase) { - isFailed = true - if lastFailedTime.IsZero() { - lastFailedTime = metav1.Now() - } - if time.Now().Before(lastFailedTime.Add(componentFailedTimeout)) { - requeueTimeAfterFailed = componentFailedTimeout - time.Since(lastFailedTime.Time) - } - } else if !lastFailedTime.IsZero() { - // reset lastFailedTime if component is not failed - lastFailedTime = metav1.Time{} - } if opsCompStatus.Phase != componentPhase { opsCompStatus.Phase = componentPhase - opsCompStatus.LastFailedTime = lastFailedTime } - // wait the component to complete - if !pgResource.noWaitComponentCompleted && !slices.Contains(appsv1alpha1.GetComponentTerminalPhases(), componentPhase) { - waitComponentCompleted = true + } else { + compObj, err := component.GetComponentByName(reqCtx.Ctx, cli, opsRes.Cluster.Namespace, + constant.GenerateClusterComponentName(opsRes.Cluster.Name, pgResource.fullComponentName)) + if err != nil { + return opsRequestPhase, 0, err } + componentPhase = compObj.Status.Phase + } + // conditions whether ops is running: + // 1. completedProgressCount is not equal to expectProgressCount when the ops do not need to wait component phase to a terminal phase. + // 2. the component phase is not a terminal phase. + // 3. no completed progresses + if (pgResource.noWaitComponentCompleted && expectCount != completedCount) || + !slices.Contains(appsv1alpha1.GetComponentTerminalPhases(), componentPhase) || completedCount == 0 { + opsIsCompleted = false } opsRequest.Status.Components[pgResource.compOps.GetComponentName()] = opsCompStatus } @@ -268,10 +277,10 @@ func (c componentOpsHelper) reconcileActionWithComponentOps(reqCtx intctrlutil.R return opsRequestPhase, 0, err } } - if waitComponentCompleted || completedProgressCount != expectProgressCount { + if !opsIsCompleted { return opsRequestPhase, 0, nil } - if isFailed { + if existFailure { if requeueTimeAfterFailed != 0 { // component failure may be temporary, waiting for component failure timeout. return opsRequestPhase, requeueTimeAfterFailed, nil diff --git a/controllers/apps/operations/ops_progress_util.go b/controllers/apps/operations/ops_progress_util.go index 65b7ed8c696..354e020be43 100644 --- a/controllers/apps/operations/ops_progress_util.go +++ b/controllers/apps/operations/ops_progress_util.go @@ -320,13 +320,9 @@ func handleFailedOrProcessingProgressDetail(opsRes *OpsResource, progressDetail appsv1alpha1.ProgressStatusDetail, pod *corev1.Pod) (completedCount int32) { componentName := pgRes.clusterComponent.Name - opsStartTime := opsRes.OpsRequest.Status.StartTimestamp - if podIsFailedDuringOperation(opsStartTime, pod, compStatus.Phase) { + isFailed, isTimeout, _ := intctrlutil.IsPodFailedAndTimedOut(pod) + if isFailed && isTimeout { podMessage := getFailedPodMessage(opsRes.Cluster, componentName, pod) - // if the pod is not failed, return - if len(podMessage) == 0 { - return - } message := getProgressFailedMessage(pgRes.opsMessageKey, progressDetail.ObjectKey, componentName, podMessage) progressDetail.SetStatusAndMessage(appsv1alpha1.FailedProgressStatus, message) completedCount = 1 @@ -344,20 +340,6 @@ func podIsPendingDuringOperation(opsStartTime metav1.Time, pod *corev1.Pod) bool return pod.CreationTimestamp.Before(&opsStartTime) && pod.DeletionTimestamp.IsZero() } -// podIsFailedDuringOperation checks if pod is failed during operation. -func podIsFailedDuringOperation( - opsStartTime metav1.Time, - pod *corev1.Pod, - componentPhase appsv1alpha1.ClusterComponentPhase) bool { - if !isFailedOrAbnormal(componentPhase) { - return false - } - // When the component is running and the pod has been created after opsStartTime, - // but it does not meet the success condition, it indicates that the changes made - // to the operations have been overwritten, resulting in a failed status. - return !pod.CreationTimestamp.Before(&opsStartTime) && componentPhase == appsv1alpha1.RunningClusterCompPhase -} - // podProcessedSuccessful checks if the pod has been processed successfully: // 1. the pod is recreated after OpsRequest.status.startTime and pod is available. // 2. the component is running and pod is available. diff --git a/controllers/apps/operations/ops_util.go b/controllers/apps/operations/ops_util.go index e0448552012..fe25d271d03 100644 --- a/controllers/apps/operations/ops_util.go +++ b/controllers/apps/operations/ops_util.go @@ -37,11 +37,6 @@ import ( intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil" ) -const ( - // componentFailedTimeout when the duration of component failure exceeds this threshold, it is determined that opsRequest has failed - componentFailedTimeout = 30 * time.Second -) - var _ error = &WaitForClusterPhaseErr{} type WaitForClusterPhaseErr struct { @@ -62,12 +57,6 @@ type handleStatusProgressWithComponent func(reqCtx intctrlutil.RequestCtx, type handleReconfigureOpsStatus func(cmStatus *appsv1alpha1.ConfigurationItemStatus) error -func isFailedOrAbnormal(phase appsv1alpha1.ClusterComponentPhase) bool { - return slices.Index([]appsv1alpha1.ClusterComponentPhase{ - appsv1alpha1.FailedClusterCompPhase, - appsv1alpha1.AbnormalClusterCompPhase}, phase) != -1 -} - // getClusterDefByName gets the ClusterDefinition object by the name. func getClusterDefByName(ctx context.Context, cli client.Client, clusterDefName string) (*appsv1alpha1.ClusterDefinition, error) { clusterDef := &appsv1alpha1.ClusterDefinition{} diff --git a/controllers/apps/operations/ops_util_test.go b/controllers/apps/operations/ops_util_test.go index 73faf2c105a..3ee90bfc2db 100644 --- a/controllers/apps/operations/ops_util_test.go +++ b/controllers/apps/operations/ops_util_test.go @@ -35,6 +35,7 @@ import ( intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil" "github.com/apecloud/kubeblocks/pkg/generics" testapps "github.com/apecloud/kubeblocks/pkg/testutil/apps" + testk8s "github.com/apecloud/kubeblocks/pkg/testutil/k8s" ) var _ = Describe("OpsUtil functions", func() { @@ -62,6 +63,7 @@ var _ = Describe("OpsUtil functions", func() { // namespaced testapps.ClearResourcesWithRemoveFinalizerOption(&testCtx, generics.InstanceSetSignature, true, inNS, ml) testapps.ClearResources(&testCtx, generics.ConfigMapSignature, inNS, ml) + testapps.ClearResources(&testCtx, generics.OpsRequestSignature, inNS, ml) } BeforeEach(cleanEnv) @@ -89,13 +91,15 @@ var _ = Describe("OpsUtil functions", func() { By("init operations resources ") opsRes, _, _ := initOperationsResources(clusterDefinitionName, clusterVersionName, clusterName) testapps.MockInstanceSetComponent(&testCtx, clusterName, consensusComp) - + pods := testapps.MockInstanceSetPods(&testCtx, nil, opsRes.Cluster, consensusComp) + time.Sleep(time.Second) By("Test the functions in ops_util.go") - opsRes.OpsRequest = createHorizontalScaling(clusterName, appsv1alpha1.HorizontalScaling{ - ComponentOps: appsv1alpha1.ComponentOps{ComponentName: consensusComp}, - Replicas: pointer.Int32(1), - }) + ops := testapps.NewOpsRequestObj("restart-ops-"+randomStr, testCtx.DefaultNamespace, + clusterName, appsv1alpha1.RestartType) + ops.Spec.RestartList = []appsv1alpha1.ComponentOps{{ComponentName: consensusComp}} + opsRes.OpsRequest = testapps.CreateOpsRequest(ctx, testCtx, ops) opsRes.OpsRequest.Status.Phase = appsv1alpha1.OpsRunningPhase + opsRes.OpsRequest.Status.StartTimestamp = metav1.Now() By("mock component failed") clusterComp := opsRes.Cluster.Status.Components[consensusComp] @@ -103,19 +107,33 @@ var _ = Describe("OpsUtil functions", func() { opsRes.Cluster.Status.SetComponentStatus(consensusComp, clusterComp) By("expect for opsRequest is running") + handleRestartProgress := func(reqCtx intctrlutil.RequestCtx, + cli client.Client, + opsRes *OpsResource, + pgRes *progressResource, + compStatus *appsv1alpha1.OpsRequestComponentStatus) (expectProgressCount int32, completedCount int32, err error) { + return handleComponentStatusProgress(reqCtx, cli, opsRes, pgRes, compStatus, + func(pod *corev1.Pod, inteface ComponentOpsInteface, opsStartTime metav1.Time, s string) bool { + return !pod.CreationTimestamp.Before(&opsStartTime) + }) + } + reqCtx := intctrlutil.RequestCtx{Ctx: ctx} - compOpsHelper := newComponentOpsHelper(opsRes.OpsRequest.Spec.HorizontalScalingList) + compOpsHelper := newComponentOpsHelper(opsRes.OpsRequest.Spec.RestartList) opsPhase, _, err := compOpsHelper.reconcileActionWithComponentOps(reqCtx, k8sClient, opsRes, - "test", handleComponentProgressForScalingReplicas) + "test", handleRestartProgress) Expect(err).Should(BeNil()) Expect(opsPhase).Should(Equal(appsv1alpha1.OpsRunningPhase)) - By("mock component failed time reaches the threshold, expect for opsRequest is Failed") - compStatus := opsRes.OpsRequest.Status.Components[consensusComp] - compStatus.LastFailedTime = metav1.Time{Time: compStatus.LastFailedTime.Add(-1 * componentFailedTimeout).Add(-1 * time.Second)} - opsRes.OpsRequest.Status.Components[consensusComp] = compStatus - opsPhase, _, err = compOpsHelper.reconcileActionWithComponentOps(reqCtx, k8sClient, opsRes, "test", handleComponentProgressForScalingReplicas) + By("mock one pod recreates failed, expect for opsRequest is Failed") + testk8s.MockPodIsTerminating(ctx, testCtx, pods[2]) + testk8s.RemovePodFinalizer(ctx, testCtx, pods[2]) + // recreate it + pod := testapps.MockInstanceSetPod(&testCtx, nil, clusterName, consensusComp, pods[2].Name, "follower", "Readonly") + // mock pod is failed + testk8s.MockPodIsFailed(ctx, testCtx, pod) + opsPhase, _, err = compOpsHelper.reconcileActionWithComponentOps(reqCtx, k8sClient, opsRes, "test", handleRestartProgress) Expect(err).Should(BeNil()) Expect(opsPhase).Should(Equal(appsv1alpha1.OpsFailedPhase)) }) diff --git a/pkg/testutil/k8s/pod_util.go b/pkg/testutil/k8s/pod_util.go index ee54389a2c0..44c9f100e6e 100644 --- a/pkg/testutil/k8s/pod_util.go +++ b/pkg/testutil/k8s/pod_util.go @@ -22,6 +22,7 @@ package testutil import ( "context" "fmt" + "time" "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" @@ -68,6 +69,28 @@ func MockPodIsTerminating(ctx context.Context, testCtx testutil.TestContext, pod }).Should(gomega.Succeed()) } +func MockPodIsFailed(ctx context.Context, testCtx testutil.TestContext, pod *corev1.Pod) { + patch := client.MergeFrom(pod.DeepCopy()) + pod.Status.Conditions = []corev1.PodCondition{ + { + Type: corev1.ContainersReady, + Status: corev1.ConditionFalse, + LastTransitionTime: metav1.Time{Time: time.Now().Add(-20 * time.Second)}, + }, + } + pod.Status.ContainerStatuses = []corev1.ContainerStatus{ + { + Name: pod.Spec.Containers[0].Name, + State: corev1.ContainerState{ + Terminated: &corev1.ContainerStateTerminated{ + ExitCode: 1, + }, + }, + }, + } + gomega.Expect(testCtx.Cli.Status().Patch(ctx, pod, patch)).Should(gomega.Succeed()) +} + // RemovePodFinalizer removes the pod finalizer to delete the pod finally. func RemovePodFinalizer(ctx context.Context, testCtx testutil.TestContext, pod *corev1.Pod) { patch := client.MergeFrom(pod.DeepCopy())