Skip to content

Commit

Permalink
fix: restore/backup exec action failed (#5805)
Browse files Browse the repository at this point in the history
(cherry picked from commit 8691103)
  • Loading branch information
wangyelei committed Nov 9, 2023
1 parent 71322d9 commit 75d94ce
Show file tree
Hide file tree
Showing 13 changed files with 144 additions and 71 deletions.
18 changes: 10 additions & 8 deletions controllers/apps/transformer_backup_policy_tpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,14 +308,16 @@ func (r *BackupPolicyTplTransformer) syncBackupPolicy(backupPolicy *dpv1alpha1.B
return
}

podSelector := backupPolicy.Spec.Target.PodSelector
if podSelector.LabelSelector == nil || podSelector.LabelSelector.MatchLabels == nil {
podSelector.LabelSelector = &metav1.LabelSelector{MatchLabels: map[string]string{}}
}
if r.getCompReplicas() == 1 {
delete(podSelector.LabelSelector.MatchLabels, constant.RoleLabelKey)
} else {
podSelector.LabelSelector.MatchLabels[constant.RoleLabelKey] = role
if backupPolicy.Spec.Target != nil {
podSelector := backupPolicy.Spec.Target.PodSelector
if podSelector.LabelSelector == nil || podSelector.LabelSelector.MatchLabels == nil {
podSelector.LabelSelector = &metav1.LabelSelector{MatchLabels: map[string]string{}}
}
if r.getCompReplicas() == 1 {
delete(podSelector.LabelSelector.MatchLabels, constant.RoleLabelKey)
} else {
podSelector.LabelSelector.MatchLabels[constant.RoleLabelKey] = role
}
}
}

Expand Down
53 changes: 26 additions & 27 deletions controllers/dataprotection/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"k8s.io/utils/clock"
Expand All @@ -42,7 +43,9 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
dpv1alpha1 "github.com/apecloud/kubeblocks/apis/dataprotection/v1alpha1"
Expand Down Expand Up @@ -128,7 +131,8 @@ func (r *BackupReconciler) SetupWithManager(mgr ctrl.Manager) error {
WithOptions(controller.Options{
MaxConcurrentReconciles: viper.GetInt(maxConcurDataProtectionReconKey),
}).
Owns(&batchv1.Job{})
Owns(&batchv1.Job{}).
Watches(&batchv1.Job{}, handler.EnqueueRequestsFromMapFunc(r.parseBackupJob))

if intctrlutil.InVolumeSnapshotV1Beta1() {
b.Owns(&vsv1beta1.VolumeSnapshot{}, builder.Predicates{})
Expand All @@ -138,6 +142,22 @@ func (r *BackupReconciler) SetupWithManager(mgr ctrl.Manager) error {
return b.Complete(r)
}

func (r *BackupReconciler) parseBackupJob(ctx context.Context, object client.Object) []reconcile.Request {
job := object.(*batchv1.Job)
var requests []reconcile.Request
backupName := job.Labels[dptypes.BackupNameLabelKey]
backupNamespace := job.Labels[dptypes.BackupNamespaceLabelKey]
if backupName != "" && backupNamespace != "" {
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: backupNamespace,
Name: backupName,
},
})
}
return requests
}

// deleteBackupFiles deletes the backup files stored in backup repository.
func (r *BackupReconciler) deleteBackupFiles(reqCtx intctrlutil.RequestCtx, backup *dpv1alpha1.Backup) error {
deleteBackup := func() error {
Expand Down Expand Up @@ -408,6 +428,7 @@ func (r *BackupReconciler) patchBackupObjectMeta(

request.Labels[constant.AppManagedByLabelKey] = constant.AppName
request.Labels[dptypes.BackupTypeLabelKey] = request.GetBackupType()
request.Labels[dptypes.BackupPolicyLabelKey] = request.Spec.BackupPolicyName
// wait for the backup repo controller to prepare the essential resource.
wait := false
if request.BackupRepo != nil {
Expand Down Expand Up @@ -585,33 +606,11 @@ func (r *BackupReconciler) updateStatusIfFailed(

// deleteExternalJobs deletes the external jobs.
func (r *BackupReconciler) deleteExternalJobs(reqCtx intctrlutil.RequestCtx, backup *dpv1alpha1.Backup) error {
jobs := &batchv1.JobList{}
if err := r.Client.List(reqCtx.Ctx, jobs,
client.InNamespace(backup.Namespace),
client.MatchingLabels(dpbackup.BuildBackupWorkloadLabels(backup))); err != nil {
return client.IgnoreNotFound(err)
}

deleteJob := func(job *batchv1.Job) error {
if err := dputils.RemoveDataProtectionFinalizer(reqCtx.Ctx, r.Client, job); err != nil {
return err
}
if !job.DeletionTimestamp.IsZero() {
return nil
}
reqCtx.Log.V(1).Info("delete job", "job", job)
if err := intctrlutil.BackgroundDeleteObject(r.Client, reqCtx.Ctx, job); err != nil {
return err
}
return nil
}

for i := range jobs.Items {
if err := deleteJob(&jobs.Items[i]); err != nil {
return err
}
labels := dpbackup.BuildBackupWorkloadLabels(backup)
if err := deleteRelatedJobs(reqCtx, r.Client, backup.Namespace, labels); err != nil {
return err
}
return nil
return deleteRelatedJobs(reqCtx, r.Client, viper.GetString(constant.CfgKeyCtrlrMgrNS), labels)
}

func (r *BackupReconciler) deleteVolumeSnapshots(reqCtx intctrlutil.RequestCtx,
Expand Down
46 changes: 29 additions & 17 deletions controllers/dataprotection/restore_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,21 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

dpv1alpha1 "github.com/apecloud/kubeblocks/apis/dataprotection/v1alpha1"
"github.com/apecloud/kubeblocks/pkg/constant"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
dprestore "github.com/apecloud/kubeblocks/pkg/dataprotection/restore"
dptypes "github.com/apecloud/kubeblocks/pkg/dataprotection/types"
viper "github.com/apecloud/kubeblocks/pkg/viperx"
)

// RestoreReconciler reconciles a Restore object
Expand Down Expand Up @@ -88,6 +91,10 @@ func (r *RestoreReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return r.newAction(reqCtx, restore)
case dpv1alpha1.RestorePhaseRunning:
return r.inProgressAction(reqCtx, restore)
case dpv1alpha1.RestorePhaseCompleted:
if err = r.deleteExternalResources(reqCtx, restore); err != nil {
return intctrlutil.RequeueWithError(err, reqCtx.Log, "")
}
}
return intctrlutil.Reconciled()
}
Expand All @@ -97,27 +104,32 @@ func (r *RestoreReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&dpv1alpha1.Restore{}).
Owns(&batchv1.Job{}).
Watches(&batchv1.Job{}, handler.EnqueueRequestsFromMapFunc(r.parseRestoreJob)).
Complete(r)
}

func (r *RestoreReconciler) parseRestoreJob(ctx context.Context, object client.Object) []reconcile.Request {
job := object.(*batchv1.Job)
var requests []reconcile.Request
restoreName := job.Labels[dprestore.DataProtectionLabelRestoreKey]
restoreNamespace := job.Labels[dprestore.DataProtectionLabelRestoreNamespaceKey]
if restoreName != "" && restoreNamespace != "" {
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: restoreNamespace,
Name: restoreName,
},
})
}
return requests
}

func (r *RestoreReconciler) deleteExternalResources(reqCtx intctrlutil.RequestCtx, restore *dpv1alpha1.Restore) error {
jobs := &batchv1.JobList{}
if err := r.Client.List(reqCtx.Ctx, jobs,
client.InNamespace(restore.Namespace),
client.MatchingLabels(dprestore.BuildRestoreLabels(restore.Name))); err != nil {
return client.IgnoreNotFound(err)
}
for i := range jobs.Items {
job := &jobs.Items[i]
if controllerutil.ContainsFinalizer(job, dptypes.DataProtectionFinalizerName) {
patch := client.MergeFrom(job.DeepCopy())
controllerutil.RemoveFinalizer(job, dptypes.DataProtectionFinalizerName)
if err := r.Patch(reqCtx.Ctx, job, patch); err != nil {
return err
}
}
labels := dprestore.BuildRestoreLabels(restore.Name)
if err := deleteRelatedJobs(reqCtx, r.Client, restore.Namespace, labels); err != nil {
return err
}
return nil
return deleteRelatedJobs(reqCtx, r.Client, viper.GetString(constant.CfgKeyCtrlrMgrNS), labels)
}

func (r *RestoreReconciler) newAction(reqCtx intctrlutil.RequestCtx, restore *dpv1alpha1.Restore) (ctrl.Result, error) {
Expand Down
23 changes: 23 additions & 0 deletions controllers/dataprotection/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"strings"
"sync"

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -38,6 +39,7 @@ import (
"github.com/apecloud/kubeblocks/pkg/constant"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
dptypes "github.com/apecloud/kubeblocks/pkg/dataprotection/types"
dputils "github.com/apecloud/kubeblocks/pkg/dataprotection/utils"
)

var (
Expand Down Expand Up @@ -198,6 +200,27 @@ func getDefaultBackupRepo(ctx context.Context, cli client.Client) (*dpv1alpha1.B
return defaultRepo, nil
}

func deleteRelatedJobs(reqCtx intctrlutil.RequestCtx, cli client.Client, namespace string, labels map[string]string) error {
if labels == nil || namespace == "" {
return nil
}
jobs := &batchv1.JobList{}
if err := cli.List(reqCtx.Ctx, jobs,
client.MatchingLabels(labels)); err != nil {
return client.IgnoreNotFound(err)
}
for i := range jobs.Items {
job := &jobs.Items[i]
if err := dputils.RemoveDataProtectionFinalizer(reqCtx.Ctx, cli, job); err != nil {
return err
}
if err := intctrlutil.BackgroundDeleteObject(cli, reqCtx.Ctx, job); err != nil {
return err
}
}
return nil
}

// ============================================================================
// refObjectMapper
// ============================================================================
Expand Down
1 change: 1 addition & 0 deletions pkg/constant/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ const (
)

const (
KBServiceAcccountName = "KUBEBLOCKS_SERVICEACCOUNT_NAME"
KBToolsImage = "KUBEBLOCKS_TOOLS_IMAGE"
KBImagePullPolicy = "KUBEBLOCKS_IMAGE_PULL_POLICY"
KBDataScriptClientsImage = "KUBEBLOCKS_DATASCRIPT_CLIENTS_IMAGE"
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/rsm/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,7 @@ func getEnvConfigMapName(rsmName string) string {
// IsOwnedByRsm is used to judge if the obj is owned by rsm
func IsOwnedByRsm(obj client.Object) bool {
for _, ref := range obj.GetOwnerReferences() {
if ref.Kind == appsv1alpha1.ReplicatedStateMachineKind && ref.Controller != nil && *ref.Controller == true {
if ref.Kind == appsv1alpha1.ReplicatedStateMachineKind && ref.Controller != nil && *ref.Controller {
return true
}
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/dataprotection/action/action_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,10 @@ func (j *JobAction) Execute(ctx Context) (*dpv1alpha1.ActionStatus, error) {
}

controllerutil.AddFinalizer(job, types.DataProtectionFinalizerName)
if err = utils.SetControllerReference(j.Owner, job, ctx.Scheme); err != nil {
return handleErr(err)
if job.Namespace == j.Owner.GetNamespace() {
if err = utils.SetControllerReference(j.Owner, job, ctx.Scheme); err != nil {
return handleErr(err)
}
}
msg := fmt.Sprintf("creating job %s/%s", job.Namespace, job.Name)
ctx.Recorder.Event(j.Owner, corev1.EventTypeNormal, "CreatingJob", msg)
Expand Down
23 changes: 16 additions & 7 deletions pkg/dataprotection/backup/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,18 +245,27 @@ func (r *Request) buildAction(name string, act *dpv1alpha1.ActionSpec) (action.A

func (r *Request) buildExecAction(name string, exec *dpv1alpha1.ExecActionSpec) action.Action {
targetPod := r.TargetPods[0]
objectMeta := *buildBackupJobObjMeta(r.Backup, name)
objectMeta.Labels[dptypes.BackupNamespaceLabelKey] = r.Namespace
// create exec job in kubeblocks namespace for security
objectMeta.Namespace = viper.GetString(constant.CfgKeyCtrlrMgrNS)
containerName := exec.Container
if exec.Container == "" {
containerName = targetPod.Spec.Containers[0].Name
}
return &action.ExecAction{
JobAction: action.JobAction{
Name: name,
ObjectMeta: *buildBackupJobObjMeta(r.Backup, name),
ObjectMeta: objectMeta,
Owner: r.Backup,
},
Command: exec.Command,
Container: exec.Container,
Namespace: targetPod.Namespace,
PodName: targetPod.Name,
Timeout: exec.Timeout,
ServiceAccountName: r.targetServiceAccountName(),
Command: exec.Command,
Container: containerName,
Namespace: targetPod.Namespace,
PodName: targetPod.Name,
Timeout: exec.Timeout,
// use the kubeblocks's serviceAccount
ServiceAccountName: viper.GetString(constant.KBServiceAcccountName),
}
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/dataprotection/restore/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type restoreJobBuilder struct {
specificVolumeMounts []corev1.VolumeMount
image string
command []string
args []string
tolerations []corev1.Toleration
nodeSelector map[string]string
jobName string
Expand Down Expand Up @@ -135,6 +136,11 @@ func (r *restoreJobBuilder) setCommand(command []string) *restoreJobBuilder {
return r
}

func (r *restoreJobBuilder) setArgs(args []string) *restoreJobBuilder {
r.args = args
return r
}

func (r *restoreJobBuilder) setToleration(tolerations []corev1.Toleration) *restoreJobBuilder {
r.tolerations = tolerations
return r
Expand Down Expand Up @@ -290,6 +296,7 @@ func (r *restoreJobBuilder) build() *batchv1.Job {
Env: r.env,
VolumeMounts: r.specificVolumeMounts,
Command: r.command,
Args: r.args,
// expand the image value with the env variables.
Image: common.Expand(r.image, common.MappingFuncFor(utils.CovertEnvToMap(r.env))),
ImagePullPolicy: corev1.PullIfNotPresent,
Expand Down
22 changes: 15 additions & 7 deletions pkg/dataprotection/restore/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
"github.com/apecloud/kubeblocks/pkg/dataprotection/utils"
"github.com/apecloud/kubeblocks/pkg/dataprotection/utils/boolptr"
viper "github.com/apecloud/kubeblocks/pkg/viperx"
)

type BackupActionSet struct {
Expand Down Expand Up @@ -457,12 +458,17 @@ func (r *RestoreManager) BuildPostReadyActionJobs(reqCtx intctrlutil.RequestCtx,
if containerName == "" {
containerName = targetPodList[i].Spec.Containers[0].Name
}
command := fmt.Sprintf("kubectl -n %s exec -it pod/%s -c %s -- %s", targetPodList[i].Namespace, targetPodList[i].Name, containerName, actionSpec.Exec.Command)
jobBuilder.setImage(constant.KBToolsImage).setCommand([]string{"sh", "-c", command}).
args := append([]string{"-n", targetPodList[i].Namespace, "exec", targetPodList[i].Name, "-c", containerName, "--"}, actionSpec.Exec.Command...)
jobBuilder.setImage(viper.GetString(constant.KBToolsImage)).setCommand([]string{"kubectl"}).setArgs(args).
setJobName(buildJobName(i)).
setToleration(targetPodList[i].Spec.Tolerations).
addTargetPodAndCredentialEnv(&targetPodList[i], r.Restore.Spec.ReadyConfig.ConnectionCredential)
restoreJobs = append(restoreJobs, jobBuilder.build())
setToleration(targetPodList[i].Spec.Tolerations)
job := jobBuilder.build()
// create exec job in kubeblocks namespace for security
job.Namespace = viper.GetString(constant.CfgKeyCtrlrMgrNS)
job.Labels[DataProtectionLabelRestoreNamespaceKey] = r.Restore.Namespace
// use the kubeblocks's serviceAccount
job.Spec.Template.Spec.ServiceAccountName = viper.GetString(constant.KBServiceAcccountName)
restoreJobs = append(restoreJobs, job)
}
return restoreJobs, nil
}
Expand Down Expand Up @@ -513,8 +519,10 @@ func (r *RestoreManager) CreateJobsIfNotExist(reqCtx intctrlutil.RequestCtx,
if !apierrors.IsNotFound(err) {
return nil, err
}
if err = controllerutil.SetControllerReference(ownerObj, objs[i], r.Schema); err != nil {
return nil, err
if ownerObj.GetNamespace() == objs[i].Namespace {
if err = controllerutil.SetControllerReference(ownerObj, objs[i], r.Schema); err != nil {
return nil, err
}
}
if err = cli.Create(reqCtx.Ctx, objs[i]); err != nil && !apierrors.IsAlreadyExists(err) {
return nil, err
Expand Down
Loading

0 comments on commit 75d94ce

Please sign in to comment.