diff --git a/apis/apps/v1alpha1/opsrequest_types.go b/apis/apps/v1alpha1/opsrequest_types.go index f8dc4e2c540..e8bd655cd95 100644 --- a/apis/apps/v1alpha1/opsrequest_types.go +++ b/apis/apps/v1alpha1/opsrequest_types.go @@ -141,6 +141,28 @@ type SpecificOpsRequest struct { // +listMapKey=componentName VolumeExpansionList []VolumeExpansion `json:"volumeExpansion,omitempty" patchStrategy:"merge,retainKeys" patchMergeKey:"componentName"` + // Lists Components to be started. If empty, all components will be started. + // + // +optional + // +kubebuilder:validation:XValidation:rule="self == oldSelf",message="forbidden to update spec.start" + // +kubebuilder:validation:MaxItems=1024 + // +patchMergeKey=componentName + // +patchStrategy=merge,retainKeys + // +listType=map + // +listMapKey=componentName + StartList []ComponentOps `json:"start,omitempty" patchStrategy:"merge,retainKeys" patchMergeKey:"componentName"` + + // Lists Components to be stopped. If empty, all components will be stopped. + // + // +optional + // +kubebuilder:validation:XValidation:rule="self == oldSelf",message="forbidden to update spec.stop" + // +kubebuilder:validation:MaxItems=1024 + // +patchMergeKey=componentName + // +patchStrategy=merge,retainKeys + // +listType=map + // +listMapKey=componentName + StopList []ComponentOps `json:"stop,omitempty" patchStrategy:"merge,retainKeys" patchMergeKey:"componentName"` + // Lists Components to be restarted. // // +optional diff --git a/apis/apps/v1alpha1/zz_generated.deepcopy.go b/apis/apps/v1alpha1/zz_generated.deepcopy.go index 41c791bda3f..ca636a4293c 100644 --- a/apis/apps/v1alpha1/zz_generated.deepcopy.go +++ b/apis/apps/v1alpha1/zz_generated.deepcopy.go @@ -6263,6 +6263,16 @@ func (in *SpecificOpsRequest) DeepCopyInto(out *SpecificOpsRequest) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.StartList != nil { + in, out := &in.StartList, &out.StartList + *out = make([]ComponentOps, len(*in)) + copy(*out, *in) + } + if in.StopList != nil { + in, out := &in.StopList, &out.StopList + *out = make([]ComponentOps, len(*in)) + copy(*out, *in) + } if in.RestartList != nil { in, out := &in.RestartList, &out.RestartList *out = make([]ComponentOps, len(*in)) diff --git a/config/crd/bases/apps.kubeblocks.io_opsrequests.yaml b/config/crd/bases/apps.kubeblocks.io_opsrequests.yaml index 4463ec49f1b..2ad6c2f4a32 100644 --- a/config/crd/bases/apps.kubeblocks.io_opsrequests.yaml +++ b/config/crd/bases/apps.kubeblocks.io_opsrequests.yaml @@ -4723,6 +4723,48 @@ spec: required: - componentName type: object + start: + description: Lists Components to be started. If empty, all components + will be started. + items: + description: ComponentOps specifies the Component to be operated + on. + properties: + componentName: + description: Specifies the name of the Component. + type: string + required: + - componentName + type: object + maxItems: 1024 + type: array + x-kubernetes-list-map-keys: + - componentName + x-kubernetes-list-type: map + x-kubernetes-validations: + - message: forbidden to update spec.start + rule: self == oldSelf + stop: + description: Lists Components to be stopped. If empty, all components + will be stopped. + items: + description: ComponentOps specifies the Component to be operated + on. + properties: + componentName: + description: Specifies the name of the Component. + type: string + required: + - componentName + type: object + maxItems: 1024 + type: array + x-kubernetes-list-map-keys: + - componentName + x-kubernetes-list-type: map + x-kubernetes-validations: + - message: forbidden to update spec.stop + rule: self == oldSelf switchover: description: Lists Switchover objects, each specifying a Component to perform the switchover operation. diff --git a/controllers/apps/operations/ops_comp_helper.go b/controllers/apps/operations/ops_comp_helper.go index 7329fe5f4f1..a0564e2717a 100644 --- a/controllers/apps/operations/ops_comp_helper.go +++ b/controllers/apps/operations/ops_comp_helper.go @@ -291,3 +291,12 @@ func (c componentOpsHelper) reconcileActionWithComponentOps(reqCtx intctrlutil.R } return appsv1alpha1.OpsSucceedPhase, 0, nil } + +func hasIntersectionCompOpsList[T ComponentOpsInterface, S ComponentOpsInterface](currCompOpsMap map[string]T, list []S) bool { + for _, comp := range list { + if _, ok := currCompOpsMap[comp.GetComponentName()]; ok { + return true + } + } + return false +} diff --git a/controllers/apps/operations/restart.go b/controllers/apps/operations/restart.go index f3039741841..e3ada1ab2b3 100644 --- a/controllers/apps/operations/restart.go +++ b/controllers/apps/operations/restart.go @@ -65,13 +65,14 @@ func (r restartOpsHandler) Action(reqCtx intctrlutil.RequestCtx, cli client.Clie return fmt.Errorf("status.startTimestamp can not be null") } // abort earlier running vertical scaling opsRequest. + r.compOpsHelper = newComponentOpsHelper(opsRes.OpsRequest.Spec.RestartList) + // abort earlier running 'Restart' opsRequest. if err := abortEarlierOpsRequestWithSameKind(reqCtx, cli, opsRes, []appsv1alpha1.OpsType{appsv1alpha1.RestartType}, func(earlierOps *appsv1alpha1.OpsRequest) (bool, error) { - return true, nil + return hasIntersectionCompOpsList(r.compOpsHelper.componentOpsSet, earlierOps.Spec.RestartList), nil }); err != nil { return err } - r.compOpsHelper = newComponentOpsHelper(opsRes.OpsRequest.Spec.RestartList) componentKindList := []client.ObjectList{ &appv1.StatefulSetList{}, &workloads.InstanceSetList{}, diff --git a/controllers/apps/operations/restart_test.go b/controllers/apps/operations/restart_test.go index 20b6bd91161..3078bb39a28 100644 --- a/controllers/apps/operations/restart_test.go +++ b/controllers/apps/operations/restart_test.go @@ -110,12 +110,20 @@ var _ = Describe("Restart OpsRequest", func() { }) }) -func createRestartOpsObj(clusterName, restartOpsName string) *appsv1alpha1.OpsRequest { +func createRestartOpsObj(clusterName, restartOpsName string, componentNames ...string) *appsv1alpha1.OpsRequest { ops := testapps.NewOpsRequestObj(restartOpsName, testCtx.DefaultNamespace, clusterName, appsv1alpha1.RestartType) - ops.Spec.RestartList = []appsv1alpha1.ComponentOps{ - {ComponentName: consensusComp}, - {ComponentName: statelessComp}, + if len(componentNames) == 0 { + ops.Spec.RestartList = []appsv1alpha1.ComponentOps{ + {ComponentName: consensusComp}, + {ComponentName: statelessComp}, + } + } else { + for _, compName := range componentNames { + ops.Spec.RestartList = append(ops.Spec.RestartList, appsv1alpha1.ComponentOps{ + ComponentName: compName, + }) + } } opsRequest := testapps.CreateOpsRequest(ctx, testCtx, ops) opsRequest.Status.Phase = appsv1alpha1.OpsPendingPhase diff --git a/controllers/apps/operations/start.go b/controllers/apps/operations/start.go index a8134109c34..1202691f6f3 100644 --- a/controllers/apps/operations/start.go +++ b/controllers/apps/operations/start.go @@ -35,15 +35,16 @@ type StartOpsHandler struct{} var _ OpsHandler = StartOpsHandler{} func init() { - stopBehaviour := OpsBehaviour{ - FromClusterPhases: []appsv1alpha1.ClusterPhase{appsv1alpha1.StoppedClusterPhase}, - ToClusterPhase: appsv1alpha1.UpdatingClusterPhase, - QueueByCluster: true, - OpsHandler: StartOpsHandler{}, + startBehaviour := OpsBehaviour{ + FromClusterPhases: append(appsv1alpha1.GetClusterUpRunningPhases(), appsv1alpha1.UpdatingClusterPhase, + appsv1alpha1.StoppedClusterPhase, appsv1alpha1.StoppingClusterPhase), + ToClusterPhase: appsv1alpha1.UpdatingClusterPhase, + QueueByCluster: true, + OpsHandler: StartOpsHandler{}, } opsMgr := GetOpsManager() - opsMgr.RegisterOps(appsv1alpha1.StartType, stopBehaviour) + opsMgr.RegisterOps(appsv1alpha1.StartType, startBehaviour) } // ActionStartedCondition the started condition when handling the start request. @@ -55,15 +56,33 @@ func (start StartOpsHandler) ActionStartedCondition(reqCtx intctrlutil.RequestCt func (start StartOpsHandler) Action(reqCtx intctrlutil.RequestCtx, cli client.Client, opsRes *OpsResource) error { var ( cluster = opsRes.Cluster - startComp = func(compSpec *appsv1alpha1.ClusterComponentSpec) { - compSpec.Stop = nil - } + startList = opsRes.OpsRequest.Spec.StartList ) - for i := range cluster.Spec.ComponentSpecs { - startComp(&cluster.Spec.ComponentSpecs[i]) + compOpsHelper := newComponentOpsHelper(startList) + // abort earlier running opsRequests. + if err := abortEarlierOpsRequestWithSameKind(reqCtx, cli, opsRes, []appsv1alpha1.OpsType{appsv1alpha1.StopType}, + func(earlierOps *appsv1alpha1.OpsRequest) (bool, error) { + if len(startList) == 0 { + // start all components + return true, nil + } + return len(earlierOps.Spec.StopList) == 0 || hasIntersectionCompOpsList(compOpsHelper.componentOpsSet, earlierOps.Spec.StopList), nil + }); err != nil { + return err + } + startComp := func(compSpec *appsv1alpha1.ClusterComponentSpec, clusterCompName string) { + if len(startList) > 0 { + if _, ok := compOpsHelper.componentOpsSet[clusterCompName]; !ok { + return + } + } + compSpec.Stop = nil + } + for i, v := range cluster.Spec.ComponentSpecs { + startComp(&cluster.Spec.ComponentSpecs[i], v.Name) } - for i := range cluster.Spec.ShardingSpecs { - startComp(&cluster.Spec.ShardingSpecs[i].Template) + for i, v := range cluster.Spec.ShardingSpecs { + startComp(&cluster.Spec.ShardingSpecs[i].Template, v.Name) } return cli.Update(reqCtx.Ctx, cluster) } @@ -84,7 +103,7 @@ func (start StartOpsHandler) ReconcileAction(reqCtx intctrlutil.RequestCtx, cli } return handleComponentProgressForScalingReplicas(reqCtx, cli, opsRes, pgRes, compStatus) } - compOpsHelper := newComponentOpsHelper([]appsv1alpha1.ComponentOps{}) + compOpsHelper := newComponentOpsHelper(opsRes.OpsRequest.Spec.StartList) return compOpsHelper.reconcileActionWithComponentOps(reqCtx, cli, opsRes, "start", handleComponentProgress) } diff --git a/controllers/apps/operations/start_test.go b/controllers/apps/operations/start_test.go index e421a7015e2..82a77ad8e12 100644 --- a/controllers/apps/operations/start_test.go +++ b/controllers/apps/operations/start_test.go @@ -22,6 +22,7 @@ package operations import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/client" @@ -38,6 +39,7 @@ var _ = Describe("Start OpsRequest", func() { clusterDefinitionName = "cluster-definition-for-ops-" + randomStr clusterVersionName = "clusterversion-for-ops-" + randomStr clusterName = "cluster-for-ops-" + randomStr + clusterDefName = "test-clusterdef-" + randomStr ) cleanEnv := func() { @@ -48,7 +50,7 @@ var _ = Describe("Start OpsRequest", func() { By("clean resources") // delete cluster(and all dependent sub-resources), clusterversion and clusterdef - testapps.ClearClusterResources(&testCtx) + testapps.ClearClusterResourcesWithRemoveFinalizerOption(&testCtx) // delete rest resources inNS := client.InNamespace(testCtx.DefaultNamespace) @@ -63,6 +65,23 @@ var _ = Describe("Start OpsRequest", func() { AfterEach(cleanEnv) Context("Test OpsRequest", func() { + createStartOpsRequest := func(opsRes *OpsResource, startCompNames ...string) *appsv1alpha1.OpsRequest { + By("create Start opsRequest") + ops := testapps.NewOpsRequestObj("start-ops-"+testCtx.GetRandomStr(), testCtx.DefaultNamespace, + clusterName, appsv1alpha1.StartType) + var startList []appsv1alpha1.ComponentOps + for _, startCompName := range startCompNames { + startList = append(startList, appsv1alpha1.ComponentOps{ + ComponentName: startCompName, + }) + } + ops.Spec.StartList = startList + opsRes.OpsRequest = testapps.CreateOpsRequest(ctx, testCtx, ops) + // set ops phase to Pending + opsRes.OpsRequest.Status.Phase = appsv1alpha1.OpsPendingPhase + return ops + } + It("Test start OpsRequest", func() { By("init operations resources ") reqCtx := intctrlutil.RequestCtx{Ctx: ctx} @@ -70,10 +89,8 @@ var _ = Describe("Start OpsRequest", func() { testapps.MockInstanceSetComponent(&testCtx, clusterName, consensusComp) testapps.MockInstanceSetComponent(&testCtx, clusterName, statelessComp) testapps.MockInstanceSetComponent(&testCtx, clusterName, statefulComp) - By("create Start opsRequest") - ops := testapps.NewOpsRequestObj("start-ops-"+randomStr, testCtx.DefaultNamespace, - clusterName, appsv1alpha1.StartType) - opsRes.OpsRequest = testapps.CreateOpsRequest(ctx, testCtx, ops) + By("create 'Start' opsRequest") + createStartOpsRequest(opsRes) By("test start action and reconcile function") Expect(opsutil.UpdateClusterOpsAnnotations(ctx, k8sClient, opsRes.Cluster, nil)).Should(Succeed()) @@ -83,12 +100,10 @@ var _ = Describe("Start OpsRequest", func() { })).ShouldNot(HaveOccurred()) // set ops phase to Pending - opsRes.OpsRequest.Status.Phase = appsv1alpha1.OpsPendingPhase - _, err := GetOpsManager().Do(reqCtx, k8sClient, opsRes) - Expect(err).ShouldNot(HaveOccurred()) + runAction(reqCtx, opsRes, appsv1alpha1.OpsCreatingPhase) Eventually(testapps.GetOpsRequestPhase(&testCtx, client.ObjectKeyFromObject(opsRes.OpsRequest))).Should(Equal(appsv1alpha1.OpsCreatingPhase)) // do start action - _, err = GetOpsManager().Do(reqCtx, k8sClient, opsRes) + _, err := GetOpsManager().Do(reqCtx, k8sClient, opsRes) Expect(err).ShouldNot(HaveOccurred()) for _, v := range opsRes.Cluster.Spec.ComponentSpecs { Expect(v.Stop).Should(BeNil()) @@ -96,5 +111,70 @@ var _ = Describe("Start OpsRequest", func() { _, err = GetOpsManager().Reconcile(reqCtx, k8sClient, opsRes) Expect(err == nil).Should(BeTrue()) }) + It("Test start specific components OpsRequest", func() { + By("init operations resources with topology") + opsRes, _, _ := initOperationsResourcesWithTopology(clusterDefName, clusterDefName, clusterName) + // mock components is stopped + Expect(testapps.ChangeObj(&testCtx, opsRes.Cluster, func(pobj *appsv1alpha1.Cluster) { + for i := range pobj.Spec.ComponentSpecs { + pobj.Spec.ComponentSpecs[i].Stop = pointer.Bool(true) + } + })).Should(Succeed()) + + By("create 'Start' opsRequest for specific components") + createStartOpsRequest(opsRes, defaultCompName) + + By("mock 'Start' OpsRequest to Creating phase") + reqCtx := intctrlutil.RequestCtx{Ctx: ctx} + runAction(reqCtx, opsRes, appsv1alpha1.OpsCreatingPhase) + + By("test start action") + startHandler := StartOpsHandler{} + err := startHandler.Action(reqCtx, k8sClient, opsRes) + Expect(err).ShouldNot(HaveOccurred()) + + By("verify components are being started") + Eventually(testapps.CheckObj(&testCtx, client.ObjectKeyFromObject(opsRes.Cluster), func(g Gomega, pobj *appsv1alpha1.Cluster) { + for _, v := range pobj.Spec.ComponentSpecs { + if v.Name == defaultCompName { + Expect(v.Stop).Should(BeNil()) + } else { + Expect(v.Stop).ShouldNot(BeNil()) + Expect(*v.Stop).Should(BeTrue()) + } + } + })).Should(Succeed()) + + By("mock components start successfully") + testapps.MockInstanceSetPods(&testCtx, nil, opsRes.Cluster, defaultCompName) + testapps.MockInstanceSetStatus(testCtx, opsRes.Cluster, defaultCompName) + + By("test reconcile") + _, err = GetOpsManager().Reconcile(reqCtx, k8sClient, opsRes) + Expect(err).ShouldNot(HaveOccurred()) + + By("verify ops request completed") + Eventually(testapps.GetOpsRequestPhase(&testCtx, + client.ObjectKeyFromObject(opsRes.OpsRequest))).Should(Equal(appsv1alpha1.OpsSucceedPhase)) + }) + + It("Test abort running 'Stop' opsRequest", func() { + By("init operations resources with topology") + opsRes, _, _ := initOperationsResourcesWithTopology(clusterDefName, clusterDefName, clusterName) + reqCtx := intctrlutil.RequestCtx{Ctx: ctx} + + By("create 'Stop' opsRequest for all components") + stopOps := createStopOpsRequest(opsRes, defaultCompName) + runAction(reqCtx, opsRes, appsv1alpha1.OpsCreatingPhase) + + By("create a start opsRequest") + createStartOpsRequest(opsRes, defaultCompName) + startHandler := StartOpsHandler{} + err := startHandler.Action(reqCtx, k8sClient, opsRes) + Expect(err).ShouldNot(HaveOccurred()) + + By("expect the 'Stop' OpsRequest to be Aborted") + Eventually(testapps.GetOpsRequestPhase(&testCtx, client.ObjectKeyFromObject(stopOps))).Should(Equal(appsv1alpha1.OpsAbortedPhase)) + }) }) }) diff --git a/controllers/apps/operations/stop.go b/controllers/apps/operations/stop.go index 2ea2b68adf6..176b4a4a08d 100644 --- a/controllers/apps/operations/stop.go +++ b/controllers/apps/operations/stop.go @@ -24,6 +24,7 @@ import ( "golang.org/x/exp/slices" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/client" appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1" @@ -55,7 +56,8 @@ func (stop StopOpsHandler) ActionStartedCondition(reqCtx intctrlutil.RequestCtx, // Action modifies Cluster.spec.components[*].replicas from the opsRequest func (stop StopOpsHandler) Action(reqCtx intctrlutil.RequestCtx, cli client.Client, opsRes *OpsResource) error { var ( - cluster = opsRes.Cluster + cluster = opsRes.Cluster + stopList = opsRes.OpsRequest.Spec.StopList ) // if the cluster is already stopping or stopped, return @@ -64,24 +66,46 @@ func (stop StopOpsHandler) Action(reqCtx intctrlutil.RequestCtx, cli client.Clie return nil } + compOpsHelper := newComponentOpsHelper(stopList) + // abort earlier running opsRequests. + // abort earlier running vertical scaling opsRequest. if err := abortEarlierOpsRequestWithSameKind(reqCtx, cli, opsRes, []appsv1alpha1.OpsType{appsv1alpha1.HorizontalScalingType, appsv1alpha1.StartType, appsv1alpha1.RestartType, appsv1alpha1.VerticalScalingType}, func(earlierOps *appsv1alpha1.OpsRequest) (bool, error) { - return true, nil + if len(stopList) == 0 { + // stop all components + return true, nil + } + switch earlierOps.Spec.Type { + case appsv1alpha1.RestartType: + return hasIntersectionCompOpsList(compOpsHelper.componentOpsSet, earlierOps.Spec.RestartList), nil + case appsv1alpha1.VerticalScalingType: + return hasIntersectionCompOpsList(compOpsHelper.componentOpsSet, earlierOps.Spec.VerticalScalingList), nil + case appsv1alpha1.HorizontalScalingType: + return hasIntersectionCompOpsList(compOpsHelper.componentOpsSet, earlierOps.Spec.HorizontalScalingList), nil + case appsv1alpha1.StartType: + return len(earlierOps.Spec.StartList) == 0 || hasIntersectionCompOpsList(compOpsHelper.componentOpsSet, earlierOps.Spec.StartList), nil + } + return false, nil }); err != nil { return err } - stopComp := func(compSpec *appsv1alpha1.ClusterComponentSpec) { - compSpec.Stop = func() *bool { b := true; return &b }() + stopComp := func(compSpec *appsv1alpha1.ClusterComponentSpec, clusterCompName string) { + if len(stopList) > 0 { + if _, ok := compOpsHelper.componentOpsSet[clusterCompName]; !ok { + return + } + } + compSpec.Stop = pointer.Bool(true) } - for i := range cluster.Spec.ComponentSpecs { - stopComp(&cluster.Spec.ComponentSpecs[i]) + for i, v := range cluster.Spec.ComponentSpecs { + stopComp(&cluster.Spec.ComponentSpecs[i], v.Name) } - for i := range cluster.Spec.ShardingSpecs { - stopComp(&cluster.Spec.ShardingSpecs[i].Template) + for i, v := range cluster.Spec.ShardingSpecs { + stopComp(&cluster.Spec.ShardingSpecs[i].Template, v.Name) } return cli.Update(reqCtx.Ctx, cluster) } @@ -106,7 +130,7 @@ func (stop StopOpsHandler) ReconcileAction(reqCtx intctrlutil.RequestCtx, cli cl } return expectProgressCount, completedCount, nil } - compOpsHelper := newComponentOpsHelper([]appsv1alpha1.ComponentOps{}) + compOpsHelper := newComponentOpsHelper(opsRes.OpsRequest.Spec.StopList) return compOpsHelper.reconcileActionWithComponentOps(reqCtx, cli, opsRes, "stop", handleComponentProgress) } diff --git a/controllers/apps/operations/stop_test.go b/controllers/apps/operations/stop_test.go index 5b43d075942..9aac1ff3767 100644 --- a/controllers/apps/operations/stop_test.go +++ b/controllers/apps/operations/stop_test.go @@ -29,6 +29,7 @@ import ( intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil" "github.com/apecloud/kubeblocks/pkg/generics" testapps "github.com/apecloud/kubeblocks/pkg/testutil/apps" + testk8s "github.com/apecloud/kubeblocks/pkg/testutil/k8s" ) var _ = Describe("Stop OpsRequest", func() { @@ -37,6 +38,7 @@ var _ = Describe("Stop OpsRequest", func() { clusterDefinitionName = "cluster-definition-for-ops-" + randomStr clusterVersionName = "clusterversion-for-ops-" + randomStr clusterName = "cluster-for-ops-" + randomStr + clusterDefName = "test-clusterdef-" + randomStr ) cleanEnv := func() { @@ -47,7 +49,7 @@ var _ = Describe("Stop OpsRequest", func() { By("clean resources") // delete cluster(and all dependent sub-resources), clusterversion and clusterdef - testapps.ClearClusterResources(&testCtx) + testapps.ClearClusterResourcesWithRemoveFinalizerOption(&testCtx) // delete rest resources inNS := client.InNamespace(testCtx.DefaultNamespace) @@ -55,6 +57,8 @@ var _ = Describe("Stop OpsRequest", func() { // namespaced testapps.ClearResourcesWithRemoveFinalizerOption(&testCtx, generics.InstanceSetSignature, true, inNS, ml) testapps.ClearResources(&testCtx, generics.OpsRequestSignature, inNS, ml) + // default GracePeriod is 30s + testapps.ClearResources(&testCtx, generics.PodSignature, inNS, ml, client.GracePeriodSeconds(0)) } BeforeEach(cleanEnv) @@ -68,20 +72,13 @@ var _ = Describe("Stop OpsRequest", func() { testapps.MockInstanceSetComponent(&testCtx, clusterName, consensusComp) testapps.MockInstanceSetComponent(&testCtx, clusterName, statelessComp) testapps.MockInstanceSetComponent(&testCtx, clusterName, statefulComp) - By("create Stop opsRequest") - ops := testapps.NewOpsRequestObj("stop-ops-"+randomStr, testCtx.DefaultNamespace, - clusterName, appsv1alpha1.StopType) - opsRes.OpsRequest = testapps.CreateOpsRequest(ctx, testCtx, ops) - // set ops phase to Pending - opsRes.OpsRequest.Status.Phase = appsv1alpha1.OpsPendingPhase - - By("test stop action and reconcile function") - // update ops phase to running first - _, err := GetOpsManager().Do(reqCtx, k8sClient, opsRes) - Expect(err).ShouldNot(HaveOccurred()) - Eventually(testapps.GetOpsRequestPhase(&testCtx, client.ObjectKeyFromObject(opsRes.OpsRequest))).Should(Equal(appsv1alpha1.OpsCreatingPhase)) + By("create 'Stop' opsRequest") + createStopOpsRequest(opsRes) + + By("test top action and reconcile function") + runAction(reqCtx, opsRes, appsv1alpha1.OpsCreatingPhase) // do stop cluster - _, err = GetOpsManager().Do(reqCtx, k8sClient, opsRes) + _, err := GetOpsManager().Do(reqCtx, k8sClient, opsRes) Expect(err).ShouldNot(HaveOccurred()) for _, v := range opsRes.Cluster.Spec.ComponentSpecs { Expect(v.Stop).ShouldNot(BeNil()) @@ -90,5 +87,108 @@ var _ = Describe("Stop OpsRequest", func() { _, err = GetOpsManager().Reconcile(reqCtx, k8sClient, opsRes) Expect(err == nil).Should(BeTrue()) }) + + It("Test stop specific components OpsRequest", func() { + By("init operations resources with topology") + opsRes, _, _ := initOperationsResourcesWithTopology(clusterDefName, clusterDefName, clusterName) + pods := testapps.MockInstanceSetPods(&testCtx, nil, opsRes.Cluster, defaultCompName) + + By("create 'Stop' opsRequest for specific components") + createStopOpsRequest(opsRes, defaultCompName) + + By("mock 'Stop' OpsRequest to Creating phase") + reqCtx := intctrlutil.RequestCtx{Ctx: ctx} + runAction(reqCtx, opsRes, appsv1alpha1.OpsCreatingPhase) + + By("test stop action") + stopHandler := StopOpsHandler{} + err := stopHandler.Action(reqCtx, k8sClient, opsRes) + Expect(err).ShouldNot(HaveOccurred()) + + By("verify components are being stopped") + Eventually(testapps.CheckObj(&testCtx, client.ObjectKeyFromObject(opsRes.Cluster), func(g Gomega, pobj *appsv1alpha1.Cluster) { + for _, v := range pobj.Spec.ComponentSpecs { + if v.Name == defaultCompName { + Expect(v.Stop).ShouldNot(BeNil()) + Expect(*v.Stop).Should(BeTrue()) + } else { + Expect(v.Stop).Should(BeNil()) + } + } + })).Should(Succeed()) + + By("mock components stopped successfully") + for i := range pods { + testk8s.MockPodIsTerminating(ctx, testCtx, pods[i]) + testk8s.RemovePodFinalizer(ctx, testCtx, pods[i]) + } + testapps.MockInstanceSetStatus(testCtx, opsRes.Cluster, defaultCompName) + + By("test reconcile") + _, err = GetOpsManager().Reconcile(reqCtx, k8sClient, opsRes) + Expect(err).ShouldNot(HaveOccurred()) + + By("verify ops request completed") + Eventually(testapps.GetOpsRequestPhase(&testCtx, + client.ObjectKeyFromObject(opsRes.OpsRequest))).Should(Equal(appsv1alpha1.OpsSucceedPhase)) + }) + + It("Test abort other running opsRequests", func() { + By("init operations resources with topology") + opsRes, _, _ := initOperationsResourcesWithTopology(clusterDefName, clusterDefName, clusterName) + reqCtx := intctrlutil.RequestCtx{Ctx: ctx} + + By("create a 'Restart' opsRequest with intersection component") + ops1 := createRestartOpsObj(clusterName, "restart-ops"+randomStr, defaultCompName) + opsRes.OpsRequest = ops1 + runAction(reqCtx, opsRes, appsv1alpha1.OpsCreatingPhase) + + By("create a 'Restart' opsRequest with non-intersection component") + ops2 := createRestartOpsObj(clusterName, "restart-ops2"+randomStr, secondaryCompName) + ops2.Spec.Force = true + opsRes.OpsRequest = ops2 + runAction(reqCtx, opsRes, appsv1alpha1.OpsCreatingPhase) + + By("create a 'Start' opsRequest") + ops3 := testapps.CreateOpsRequest(ctx, testCtx, testapps.NewOpsRequestObj("start-ops-"+randomStr, testCtx.DefaultNamespace, + clusterName, appsv1alpha1.StartType)) + opsRes.OpsRequest = ops3 + Expect(testapps.ChangeObjStatus(&testCtx, ops3, func() { + ops3.Status.Phase = appsv1alpha1.OpsPendingPhase + })).Should(Succeed()) + runAction(reqCtx, opsRes, appsv1alpha1.OpsPendingPhase) + + By("create 'Stop' opsRequest for all components") + createStopOpsRequest(opsRes, defaultCompName) + stopHandler := StopOpsHandler{} + err := stopHandler.Action(reqCtx, k8sClient, opsRes) + Expect(err).ShouldNot(HaveOccurred()) + + By("expect the 'Restart' opsRequest with intersection component to be Aborted") + Eventually(testapps.GetOpsRequestPhase(&testCtx, client.ObjectKeyFromObject(ops1))).Should(Equal(appsv1alpha1.OpsAbortedPhase)) + + By("expect the 'Restart' opsRequest with non-intersection component to be Creating") + Eventually(testapps.GetOpsRequestPhase(&testCtx, client.ObjectKeyFromObject(ops2))).Should(Equal(appsv1alpha1.OpsCreatingPhase)) + + By("expect the 'Start' opsRequest to be Aborted") + Eventually(testapps.GetOpsRequestPhase(&testCtx, client.ObjectKeyFromObject(ops1))).Should(Equal(appsv1alpha1.OpsAbortedPhase)) + }) }) }) + +func createStopOpsRequest(opsRes *OpsResource, stopCompNames ...string) *appsv1alpha1.OpsRequest { + By("create Stop opsRequest") + ops := testapps.NewOpsRequestObj("stop-ops-"+testCtx.GetRandomStr(), testCtx.DefaultNamespace, + opsRes.Cluster.Name, appsv1alpha1.StopType) + var stopList []appsv1alpha1.ComponentOps + for _, stopCompName := range stopCompNames { + stopList = append(stopList, appsv1alpha1.ComponentOps{ + ComponentName: stopCompName, + }) + } + ops.Spec.StopList = stopList + opsRes.OpsRequest = testapps.CreateOpsRequest(ctx, testCtx, ops) + // set ops phase to Pending + opsRes.OpsRequest.Status.Phase = appsv1alpha1.OpsPendingPhase + return ops +} diff --git a/controllers/apps/operations/suite_test.go b/controllers/apps/operations/suite_test.go index 519a1defe0e..12e14a5cda4 100644 --- a/controllers/apps/operations/suite_test.go +++ b/controllers/apps/operations/suite_test.go @@ -68,6 +68,10 @@ const ( statelessComp = "stateless" statefulComp = "stateful" consensusComp = "consensus" + + defaultCompName = "default" + secondaryCompName = "secondary" + thirdCompName = "third" ) func init() { @@ -179,6 +183,73 @@ func initOperationsResources(clusterDefinitionName, return opsRes, clusterDef, clusterObject } +func initOperationsResourcesWithTopology(clusterDefName, compDefName, clusterName string) (*OpsResource, *appsv1alpha1.ComponentDefinition, *appsv1alpha1.Cluster) { + topologyName := "cluster-mode" + testapps.NewClusterDefFactory(clusterDefName). + AddClusterTopology(appsv1alpha1.ClusterTopology{ + Name: topologyName, + Orders: &appsv1alpha1.ClusterTopologyOrders{ + Update: []string{ + defaultCompName, + secondaryCompName, + thirdCompName, + }, + }, + Components: []appsv1alpha1.ClusterTopologyComponent{ + {Name: defaultCompName, CompDef: compDefName}, + {Name: secondaryCompName, CompDef: compDefName}, + {Name: thirdCompName, CompDef: compDefName}, + }, + }). + Create(&testCtx). + GetObject() + + compDef := testapps.NewComponentDefinitionFactory(compDefName). + SetDefaultSpec(). + Create(&testCtx). + GetObject() + + pvcSpec := testapps.NewPVCSpec("1Gi") + clusterObject := testapps.NewClusterFactory(testCtx.DefaultNamespace, clusterName, clusterDefName, ""). + AddComponent(defaultCompName, compDef.GetName()). + SetReplicas(3). + AddVolumeClaimTemplate(testapps.DataVolumeName, pvcSpec). + AddComponent(secondaryCompName, compDef.GetName()). + SetReplicas(3). + AddComponent(thirdCompName, compDef.GetName()). + SetReplicas(3). + SetTopology(topologyName). + Create(&testCtx). + GetObject() + + opsRes := &OpsResource{ + Cluster: clusterObject, + Recorder: k8sManager.GetEventRecorderFor("opsrequest-controller"), + } + + By("mock cluster is Running and the status operations") + Expect(testapps.ChangeObjStatus(&testCtx, clusterObject, func() { + clusterObject.Status.Phase = appsv1alpha1.RunningClusterPhase + clusterObject.Status.Components = map[string]appsv1alpha1.ClusterComponentStatus{ + defaultCompName: { + Phase: appsv1alpha1.RunningClusterCompPhase, + }, + secondaryCompName: { + Phase: appsv1alpha1.RunningClusterCompPhase, + }, + thirdCompName: { + Phase: appsv1alpha1.RunningClusterCompPhase, + }, + } + })).Should(Succeed()) + opsRes.Cluster = clusterObject + + testapps.MockInstanceSetComponent(&testCtx, clusterName, defaultCompName) + testapps.MockInstanceSetComponent(&testCtx, clusterName, secondaryCompName) + testapps.MockInstanceSetComponent(&testCtx, clusterName, thirdCompName) + return opsRes, compDef, clusterObject +} + func initInstanceSetPods(ctx context.Context, cli client.Client, opsRes *OpsResource) []corev1.Pod { // mock the pods of consensusSet component testapps.MockInstanceSetPods(&testCtx, nil, opsRes.Cluster, consensusComp) @@ -199,3 +270,9 @@ func mockComponentIsOperating(cluster *appsv1alpha1.Cluster, expectPhase appsv1a } })).Should(Succeed()) } + +func runAction(reqCtx intctrlutil.RequestCtx, opsRes *OpsResource, expectPhase appsv1alpha1.OpsPhase) { + _, err := GetOpsManager().Do(reqCtx, k8sClient, opsRes) + Expect(err).ShouldNot(HaveOccurred()) + Eventually(testapps.GetOpsRequestPhase(&testCtx, client.ObjectKeyFromObject(opsRes.OpsRequest))).Should(Equal(expectPhase)) +} diff --git a/controllers/apps/transformer_cluster_status.go b/controllers/apps/transformer_cluster_status.go index 077041ee13f..dab2c385d6b 100644 --- a/controllers/apps/transformer_cluster_status.go +++ b/controllers/apps/transformer_cluster_status.go @@ -82,13 +82,13 @@ func (t *clusterStatusTransformer) markClusterDagStatusAction(graphCli model.Gra func (t *clusterStatusTransformer) reconcileClusterPhase(cluster *appsv1alpha1.Cluster) { var ( - isAllComponentCreating = true - isAllComponentRunning = true - isAllComponentWorking = true - hasComponentStopping = false - isAllComponentStopped = true - isAllComponentFailed = true - hasComponentAbnormalOrFailed = false + isAllComponentCreating = true + isAllComponentRunningOrStopped = true + isAllComponentWorking = true + hasComponentStopping = false + isAllComponentStopped = true + isAllComponentFailed = true + hasComponentAbnormalOrFailed = false ) isPhaseIn := func(phase appsv1alpha1.ClusterComponentPhase, phases ...appsv1alpha1.ClusterComponentPhase) bool { for _, p := range phases { @@ -103,8 +103,8 @@ func (t *clusterStatusTransformer) reconcileClusterPhase(cluster *appsv1alpha1.C if !isPhaseIn(phase, appsv1alpha1.CreatingClusterCompPhase) { isAllComponentCreating = false } - if !isPhaseIn(phase, appsv1alpha1.RunningClusterCompPhase) { - isAllComponentRunning = false + if !isPhaseIn(phase, appsv1alpha1.RunningClusterCompPhase, appsv1alpha1.StoppedClusterCompPhase) { + isAllComponentRunningOrStopped = false } if !isPhaseIn(phase, appsv1alpha1.CreatingClusterCompPhase, appsv1alpha1.RunningClusterCompPhase, @@ -126,7 +126,11 @@ func (t *clusterStatusTransformer) reconcileClusterPhase(cluster *appsv1alpha1.C } switch { - case isAllComponentRunning: + case isAllComponentStopped: + if cluster.Status.Phase != appsv1alpha1.StoppedClusterPhase { + t.syncClusterPhaseToStopped(cluster) + } + case isAllComponentRunningOrStopped: if cluster.Status.Phase != appsv1alpha1.RunningClusterPhase { t.syncClusterPhaseToRunning(cluster) } @@ -134,10 +138,6 @@ func (t *clusterStatusTransformer) reconcileClusterPhase(cluster *appsv1alpha1.C cluster.Status.Phase = appsv1alpha1.CreatingClusterPhase case isAllComponentWorking: cluster.Status.Phase = appsv1alpha1.UpdatingClusterPhase - case isAllComponentStopped: - if cluster.Status.Phase != appsv1alpha1.StoppedClusterPhase { - t.syncClusterPhaseToStopped(cluster) - } case hasComponentStopping: cluster.Status.Phase = appsv1alpha1.StoppingClusterPhase case isAllComponentFailed: diff --git a/deploy/helm/crds/apps.kubeblocks.io_opsrequests.yaml b/deploy/helm/crds/apps.kubeblocks.io_opsrequests.yaml index 4463ec49f1b..2ad6c2f4a32 100644 --- a/deploy/helm/crds/apps.kubeblocks.io_opsrequests.yaml +++ b/deploy/helm/crds/apps.kubeblocks.io_opsrequests.yaml @@ -4723,6 +4723,48 @@ spec: required: - componentName type: object + start: + description: Lists Components to be started. If empty, all components + will be started. + items: + description: ComponentOps specifies the Component to be operated + on. + properties: + componentName: + description: Specifies the name of the Component. + type: string + required: + - componentName + type: object + maxItems: 1024 + type: array + x-kubernetes-list-map-keys: + - componentName + x-kubernetes-list-type: map + x-kubernetes-validations: + - message: forbidden to update spec.start + rule: self == oldSelf + stop: + description: Lists Components to be stopped. If empty, all components + will be stopped. + items: + description: ComponentOps specifies the Component to be operated + on. + properties: + componentName: + description: Specifies the name of the Component. + type: string + required: + - componentName + type: object + maxItems: 1024 + type: array + x-kubernetes-list-map-keys: + - componentName + x-kubernetes-list-type: map + x-kubernetes-validations: + - message: forbidden to update spec.stop + rule: self == oldSelf switchover: description: Lists Switchover objects, each specifying a Component to perform the switchover operation. diff --git a/docs/developer_docs/api-reference/cluster.md b/docs/developer_docs/api-reference/cluster.md index c6ac8003a33..8e4e710653f 100644 --- a/docs/developer_docs/api-reference/cluster.md +++ b/docs/developer_docs/api-reference/cluster.md @@ -20714,6 +20714,34 @@ that requires storage expansion.

+start
+ + +[]ComponentOps + + + + +(Optional) +

Lists Components to be started. If empty, all components will be started.

+ + + + +stop
+ + +[]ComponentOps + + + + +(Optional) +

Lists Components to be stopped. If empty, all components will be stopped.

+ + + + restart
diff --git a/pkg/testutil/apps/cluster_instance_set_test_util.go b/pkg/testutil/apps/cluster_instance_set_test_util.go index 068c5404200..c4353644706 100644 --- a/pkg/testutil/apps/cluster_instance_set_test_util.go +++ b/pkg/testutil/apps/cluster_instance_set_test_util.go @@ -329,7 +329,9 @@ func MockInstanceSetStatus(testCtx testutil.TestContext, cluster *appsv1alpha1.C CanVote: true, }, } - if memberStatus.ReplicaRole.AccessMode == workloads.ReadWriteMode { + if memberStatus.ReplicaRole.AccessMode == "" { + memberStatus.ReplicaRole.AccessMode = workloads.NoneMode + } else if memberStatus.ReplicaRole.AccessMode == workloads.ReadWriteMode { memberStatus.ReplicaRole.IsLeader = true } newMembersStatus = append(newMembersStatus, memberStatus)