diff --git a/cmd/manager/main.go b/cmd/manager/main.go index d2bf90951fb..55b2e646034 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -57,6 +57,7 @@ import ( storagecontrollers "github.com/apecloud/kubeblocks/controllers/storage" workloadscontrollers "github.com/apecloud/kubeblocks/controllers/workloads" "github.com/apecloud/kubeblocks/pkg/constant" + "github.com/apecloud/kubeblocks/pkg/controller/rsm" intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil" viper "github.com/apecloud/kubeblocks/pkg/viperx" ) @@ -107,6 +108,7 @@ func init() { viper.SetDefault(constant.FeatureGateReplicatedStateMachine, true) viper.SetDefault(constant.KBDataScriptClientsImage, "apecloud/kubeblocks-datascript:latest") viper.SetDefault(constant.KubernetesClusterDomainEnv, constant.DefaultDNSDomain) + viper.SetDefault(rsm.FeatureGateRSMCompatibilityMode, true) } type flagName string diff --git a/pkg/cli/types/types.go b/pkg/cli/types/types.go index 09c2b4c6ee8..37194734c4c 100644 --- a/pkg/cli/types/types.go +++ b/pkg/cli/types/types.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1" + workloads "github.com/apecloud/kubeblocks/apis/workloads/v1alpha1" ) const ( @@ -108,6 +109,8 @@ const ( KindOps = "OpsRequest" KindBackupSchedule = "BackupSchedule" KindBackupPolicyTemplate = "BackupPolicyTemplate" + KindStatefulSet = "StatefulSet" + KindRSM = "ReplicatedStateMachine" ) // K8S rbac API group @@ -185,6 +188,11 @@ const ( ResourceTpch = "tpches" ) +// Workload API group +const ( + ResourceRSM = "replicatedstatemachines" +) + const ( None = "" @@ -311,6 +319,10 @@ func StatefulSetGVR() schema.GroupVersionResource { return schema.GroupVersionResource{Group: appsv1.GroupName, Version: K8sCoreAPIVersion, Resource: ResourceStatefulSets} } +func RSMGVR() schema.GroupVersionResource { + return schema.GroupVersionResource{Group: workloads.GroupVersion.Group, Version: workloads.GroupVersion.Version, Resource: ResourceRSM} +} + func DaemonSetGVR() schema.GroupVersionResource { return schema.GroupVersionResource{Group: appsv1.GroupName, Version: K8sCoreAPIVersion, Resource: ResourceDaemonSets} } diff --git a/pkg/cli/util/breakingchange/upgradehandlerto0.7.go b/pkg/cli/util/breakingchange/upgradehandlerto0.7.go index 229af4f2d87..bd24d2fe825 100644 --- a/pkg/cli/util/breakingchange/upgradehandlerto0.7.go +++ b/pkg/cli/util/breakingchange/upgradehandlerto0.7.go @@ -24,17 +24,21 @@ import ( "fmt" "strings" + v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" apitypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/json" "k8s.io/client-go/dynamic" dpv1alpha1 "github.com/apecloud/kubeblocks/apis/dataprotection/v1alpha1" "github.com/apecloud/kubeblocks/pkg/cli/types" + "github.com/apecloud/kubeblocks/pkg/constant" + "github.com/apecloud/kubeblocks/pkg/controller/builder" ) var _ upgradeHandler = &upgradeHandlerTo7{} @@ -57,6 +61,11 @@ func (u *upgradeHandlerTo7) snapshot(dynamic dynamic.Interface) (map[string][]un if err := fillResourcesMap(dynamic, resourcesMap, types.BackupGVR()); err != nil { return nil, err } + + // get stateful_set objs + if err := fillResourcesMap(dynamic, resourcesMap, types.StatefulSetGVR()); err != nil { + return nil, err + } return resourcesMap, nil } @@ -72,6 +81,10 @@ func (u *upgradeHandlerTo7) transform(dynamic dynamic.Interface, resourcesMap ma if err := u.transformBackup(dynamic, obj); err != nil { return err } + case types.KindStatefulSet: + if err := u.transformStatefulSet(dynamic, obj); err != nil { + return err + } } } } @@ -408,3 +421,60 @@ func (u *upgradeHandlerTo7) transformBackup(dynamic dynamic.Interface, obj unstr } return nil } + +func (u *upgradeHandlerTo7) transformStatefulSet(dynamic dynamic.Interface, obj unstructured.Unstructured) error { + // filter objects not managed by KB + labels := obj.GetLabels() + if labels == nil || labels[constant.AppManagedByLabelKey] != constant.AppName { + return nil + } + // create a rsm + serviceName, _, _ := unstructured.NestedString(obj.Object, "spec", "serviceName") + matchLabels, _, _ := unstructured.NestedStringMap(obj.Object, "spec", "selector", "matchLabels") + replicas, _, _ := unstructured.NestedInt64(obj.Object, "spec", "replicas") + podManagementPolicy, _, _ := unstructured.NestedString(obj.Object, "spec", "podManagementPolicy") + pvcsUnstructured, _, _ := unstructured.NestedSlice(obj.Object, "spec", "volumeClaimTemplates") + var pvcs []corev1.PersistentVolumeClaim + for _, pvcUnstructured := range pvcsUnstructured { + pvc := &corev1.PersistentVolumeClaim{} + pvcU, _ := pvcUnstructured.(map[string]interface{}) + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(pvcU, pvc); err != nil { + return err + } + pvcs = append(pvcs, *pvc) + } + template, _, _ := unstructured.NestedMap(obj.Object, "spec", "template") + podTemplate := &corev1.PodTemplateSpec{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(template, podTemplate); err != nil { + return err + } + updateStrategy, _, _ := unstructured.NestedString(obj.Object, "spec", "updateStrategy") + rsm := builder.NewReplicatedStateMachineBuilder(obj.GetNamespace(), obj.GetName()). + AddAnnotationsInMap(obj.GetAnnotations()). + AddLabelsInMap(obj.GetLabels()). + SetServiceName(serviceName). + AddMatchLabelsInMap(matchLabels). + SetReplicas(int32(replicas)). + SetPodManagementPolicy(v1.PodManagementPolicyType(podManagementPolicy)). + SetVolumeClaimTemplates(pvcs...). + SetTemplate(*podTemplate). + SetUpdateStrategyType(v1.StatefulSetUpdateStrategyType(updateStrategy)). + GetObject() + gvk := schema.GroupVersionKind{ + Group: types.RSMGVR().Group, + Version: types.RSMGVR().Version, + Kind: types.KindRSM, + } + rsm.SetGroupVersionKind(gvk) + + unstructuredMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(rsm) + if err != nil { + return err + } + _, err = dynamic.Resource(types.RSMGVR()).Namespace(obj.GetNamespace()).Create(context.TODO(), + &unstructured.Unstructured{Object: unstructuredMap}, metav1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("create rsm %s failed: %s", rsm.Name, err.Error()) + } + return nil +} diff --git a/pkg/controller/rsm/transformer_deletion.go b/pkg/controller/rsm/transformer_deletion.go index 76dc8cce865..46b85dcf993 100644 --- a/pkg/controller/rsm/transformer_deletion.go +++ b/pkg/controller/rsm/transformer_deletion.go @@ -20,11 +20,8 @@ along with this program. If not, see . package rsm import ( - "sigs.k8s.io/controller-runtime/pkg/client" - "github.com/apecloud/kubeblocks/pkg/controller/graph" "github.com/apecloud/kubeblocks/pkg/controller/model" - viper "github.com/apecloud/kubeblocks/pkg/viperx" ) // ObjectDeletionTransformer handles object and its secondary resources' deletion @@ -50,11 +47,6 @@ func (t *ObjectDeletionTransformer) Transform(ctx graph.TransformContext, dag *g return err } for _, object := range snapshot { - if viper.GetBool(FeatureGateRSMCompatibilityMode) { - if !managedByRSM(object) { - continue - } - } graphCli.Delete(dag, object) } graphCli.Delete(dag, obj) @@ -62,14 +54,3 @@ func (t *ObjectDeletionTransformer) Transform(ctx graph.TransformContext, dag *g // fast return, that is stopping the plan.Build() stage and jump to plan.Execute() directly return graph.ErrPrematureStop } - -func managedByRSM(object client.Object) bool { - labels := object.GetLabels() - if labels == nil { - return false - } - if manager, ok := labels[workloadsManagedByLabelKey]; ok && manager == kindReplicatedStateMachine { - return true - } - return false -} diff --git a/pkg/controller/rsm/transformer_status.go b/pkg/controller/rsm/transformer_status.go index 0c57e115b17..57db0bec2a8 100644 --- a/pkg/controller/rsm/transformer_status.go +++ b/pkg/controller/rsm/transformer_status.go @@ -60,11 +60,13 @@ func (t *ObjectStatusTransformer) Transform(ctx graph.TransformContext, dag *gra generation := rsm.Status.ObservedGeneration rsm.Status.StatefulSetStatus = sts.Status rsm.Status.ObservedGeneration = generation - currentGeneration, err := strconv.ParseInt(sts.Labels[rsmGenerationLabelKey], 10, 64) - if err != nil { - return err + if currentGenerationLabel, ok := sts.Labels[rsmGenerationLabelKey]; ok { + currentGeneration, err := strconv.ParseInt(currentGenerationLabel, 10, 64) + if err != nil { + return err + } + rsm.Status.CurrentGeneration = currentGeneration } - rsm.Status.CurrentGeneration = currentGeneration // read all pods belong to the sts, hence belong to the rsm pods, err := getPodsOfStatefulSet(transCtx.Context, transCtx.Client, sts) if err != nil {