Skip to content

Commit

Permalink
fix: workload upgrading from 0.5&0.6 to 0.7 (#5659)
Browse files Browse the repository at this point in the history
(cherry picked from commit 9f47174)
  • Loading branch information
free6om committed Oct 27, 2023
1 parent d80c4dd commit efaac5b
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 23 deletions.
2 changes: 2 additions & 0 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
storagecontrollers "github.com/apecloud/kubeblocks/controllers/storage"
workloadscontrollers "github.com/apecloud/kubeblocks/controllers/workloads"
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/rsm"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
viper "github.com/apecloud/kubeblocks/pkg/viperx"
)
Expand Down Expand Up @@ -107,6 +108,7 @@ func init() {
viper.SetDefault(constant.FeatureGateReplicatedStateMachine, true)
viper.SetDefault(constant.KBDataScriptClientsImage, "apecloud/kubeblocks-datascript:latest")
viper.SetDefault(constant.KubernetesClusterDomainEnv, constant.DefaultDNSDomain)
viper.SetDefault(rsm.FeatureGateRSMCompatibilityMode, true)
}

type flagName string
Expand Down
12 changes: 12 additions & 0 deletions pkg/cli/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
workloads "github.com/apecloud/kubeblocks/apis/workloads/v1alpha1"
)

const (
Expand Down Expand Up @@ -108,6 +109,8 @@ const (
KindOps = "OpsRequest"
KindBackupSchedule = "BackupSchedule"
KindBackupPolicyTemplate = "BackupPolicyTemplate"
KindStatefulSet = "StatefulSet"
KindRSM = "ReplicatedStateMachine"
)

// K8S rbac API group
Expand Down Expand Up @@ -185,6 +188,11 @@ const (
ResourceTpch = "tpches"
)

// Workload API group
const (
ResourceRSM = "replicatedstatemachines"
)

const (
None = "<none>"

Expand Down Expand Up @@ -311,6 +319,10 @@ func StatefulSetGVR() schema.GroupVersionResource {
return schema.GroupVersionResource{Group: appsv1.GroupName, Version: K8sCoreAPIVersion, Resource: ResourceStatefulSets}
}

func RSMGVR() schema.GroupVersionResource {
return schema.GroupVersionResource{Group: workloads.GroupVersion.Group, Version: workloads.GroupVersion.Version, Resource: ResourceRSM}
}

func DaemonSetGVR() schema.GroupVersionResource {
return schema.GroupVersionResource{Group: appsv1.GroupName, Version: K8sCoreAPIVersion, Resource: ResourceDaemonSets}
}
Expand Down
70 changes: 70 additions & 0 deletions pkg/cli/util/breakingchange/upgradehandlerto0.7.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,21 @@ import (
"fmt"
"strings"

v1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
apitypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/client-go/dynamic"

dpv1alpha1 "github.com/apecloud/kubeblocks/apis/dataprotection/v1alpha1"
"github.com/apecloud/kubeblocks/pkg/cli/types"
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/builder"
)

var _ upgradeHandler = &upgradeHandlerTo7{}
Expand All @@ -57,6 +61,11 @@ func (u *upgradeHandlerTo7) snapshot(dynamic dynamic.Interface) (map[string][]un
if err := fillResourcesMap(dynamic, resourcesMap, types.BackupGVR()); err != nil {
return nil, err
}

// get stateful_set objs
if err := fillResourcesMap(dynamic, resourcesMap, types.StatefulSetGVR()); err != nil {
return nil, err
}
return resourcesMap, nil
}

Expand All @@ -72,6 +81,10 @@ func (u *upgradeHandlerTo7) transform(dynamic dynamic.Interface, resourcesMap ma
if err := u.transformBackup(dynamic, obj); err != nil {
return err
}
case types.KindStatefulSet:
if err := u.transformStatefulSet(dynamic, obj); err != nil {
return err
}
}
}
}
Expand Down Expand Up @@ -408,3 +421,60 @@ func (u *upgradeHandlerTo7) transformBackup(dynamic dynamic.Interface, obj unstr
}
return nil
}

func (u *upgradeHandlerTo7) transformStatefulSet(dynamic dynamic.Interface, obj unstructured.Unstructured) error {
// filter objects not managed by KB
labels := obj.GetLabels()
if labels == nil || labels[constant.AppManagedByLabelKey] != constant.AppName {
return nil
}
// create a rsm
serviceName, _, _ := unstructured.NestedString(obj.Object, "spec", "serviceName")
matchLabels, _, _ := unstructured.NestedStringMap(obj.Object, "spec", "selector", "matchLabels")
replicas, _, _ := unstructured.NestedInt64(obj.Object, "spec", "replicas")
podManagementPolicy, _, _ := unstructured.NestedString(obj.Object, "spec", "podManagementPolicy")
pvcsUnstructured, _, _ := unstructured.NestedSlice(obj.Object, "spec", "volumeClaimTemplates")
var pvcs []corev1.PersistentVolumeClaim
for _, pvcUnstructured := range pvcsUnstructured {
pvc := &corev1.PersistentVolumeClaim{}
pvcU, _ := pvcUnstructured.(map[string]interface{})
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(pvcU, pvc); err != nil {
return err
}
pvcs = append(pvcs, *pvc)
}
template, _, _ := unstructured.NestedMap(obj.Object, "spec", "template")
podTemplate := &corev1.PodTemplateSpec{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(template, podTemplate); err != nil {
return err
}
updateStrategy, _, _ := unstructured.NestedString(obj.Object, "spec", "updateStrategy")
rsm := builder.NewReplicatedStateMachineBuilder(obj.GetNamespace(), obj.GetName()).
AddAnnotationsInMap(obj.GetAnnotations()).
AddLabelsInMap(obj.GetLabels()).
SetServiceName(serviceName).
AddMatchLabelsInMap(matchLabels).
SetReplicas(int32(replicas)).
SetPodManagementPolicy(v1.PodManagementPolicyType(podManagementPolicy)).
SetVolumeClaimTemplates(pvcs...).
SetTemplate(*podTemplate).
SetUpdateStrategyType(v1.StatefulSetUpdateStrategyType(updateStrategy)).
GetObject()
gvk := schema.GroupVersionKind{
Group: types.RSMGVR().Group,
Version: types.RSMGVR().Version,
Kind: types.KindRSM,
}
rsm.SetGroupVersionKind(gvk)

unstructuredMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(rsm)
if err != nil {
return err
}
_, err = dynamic.Resource(types.RSMGVR()).Namespace(obj.GetNamespace()).Create(context.TODO(),
&unstructured.Unstructured{Object: unstructuredMap}, metav1.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("create rsm %s failed: %s", rsm.Name, err.Error())
}
return nil
}
19 changes: 0 additions & 19 deletions pkg/controller/rsm/transformer_deletion.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
package rsm

import (
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/apecloud/kubeblocks/pkg/controller/graph"
"github.com/apecloud/kubeblocks/pkg/controller/model"
viper "github.com/apecloud/kubeblocks/pkg/viperx"
)

// ObjectDeletionTransformer handles object and its secondary resources' deletion
Expand All @@ -50,26 +47,10 @@ func (t *ObjectDeletionTransformer) Transform(ctx graph.TransformContext, dag *g
return err
}
for _, object := range snapshot {
if viper.GetBool(FeatureGateRSMCompatibilityMode) {
if !managedByRSM(object) {
continue
}
}
graphCli.Delete(dag, object)
}
graphCli.Delete(dag, obj)

// fast return, that is stopping the plan.Build() stage and jump to plan.Execute() directly
return graph.ErrPrematureStop
}

func managedByRSM(object client.Object) bool {
labels := object.GetLabels()
if labels == nil {
return false
}
if manager, ok := labels[workloadsManagedByLabelKey]; ok && manager == kindReplicatedStateMachine {
return true
}
return false
}
10 changes: 6 additions & 4 deletions pkg/controller/rsm/transformer_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,13 @@ func (t *ObjectStatusTransformer) Transform(ctx graph.TransformContext, dag *gra
generation := rsm.Status.ObservedGeneration
rsm.Status.StatefulSetStatus = sts.Status
rsm.Status.ObservedGeneration = generation
currentGeneration, err := strconv.ParseInt(sts.Labels[rsmGenerationLabelKey], 10, 64)
if err != nil {
return err
if currentGenerationLabel, ok := sts.Labels[rsmGenerationLabelKey]; ok {
currentGeneration, err := strconv.ParseInt(currentGenerationLabel, 10, 64)
if err != nil {
return err
}
rsm.Status.CurrentGeneration = currentGeneration
}
rsm.Status.CurrentGeneration = currentGeneration
// read all pods belong to the sts, hence belong to the rsm
pods, err := getPodsOfStatefulSet(transCtx.Context, transCtx.Client, sts)
if err != nil {
Expand Down

0 comments on commit efaac5b

Please sign in to comment.