From 5f98ca9ca31bcc3d84c574fa048a8a3e8842a5a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ege=20G=C3=BCne=C5=9F?= Date: Mon, 20 Jan 2025 15:45:09 +0300 Subject: [PATCH] K8SPXC-1366: Suspend backup job if cluster becomes unready Backups can put pressure on a PXC cluster. With these changes, we introduce a new safety mechanism to pause backups if cluster becomes unhealthy. This mechanism can be disabled by enabling `spec.unsafeFlags.backupIfUnhealthy`. Operator will periodically check cluster status and ready PXC pods while a backup is running. If ready PXC pods, at any point, becomes less than the desired number of PXC pods, operator will suspend backup job. Suspending the backup job will terminate any running backup pod. Operator will automatically resume the job once ready PXC pods are equal to desired number of PXC pods. --- pkg/apis/pxc/v1/pxc_backup_types.go | 2 +- pkg/controller/pxcbackup/controller.go | 171 ++++++++++++++++++++++--- pkg/naming/labels.go | 9 ++ pkg/pxc/backup/job.go | 6 +- 4 files changed, 163 insertions(+), 25 deletions(-) diff --git a/pkg/apis/pxc/v1/pxc_backup_types.go b/pkg/apis/pxc/v1/pxc_backup_types.go index 1723100ad..de6dd0124 100644 --- a/pkg/apis/pxc/v1/pxc_backup_types.go +++ b/pkg/apis/pxc/v1/pxc_backup_types.go @@ -164,7 +164,7 @@ type PXCBackupState string const ( BackupNew PXCBackupState = "" - BackupWaiting PXCBackupState = "Waiting" + BackupSuspended PXCBackupState = "Suspended" BackupStarting PXCBackupState = "Starting" BackupRunning PXCBackupState = "Running" BackupFailed PXCBackupState = "Failed" diff --git a/pkg/controller/pxcbackup/controller.go b/pkg/controller/pxcbackup/controller.go index 57a324c74..5555a0f85 100644 --- a/pkg/controller/pxcbackup/controller.go +++ b/pkg/controller/pxcbackup/controller.go @@ -150,8 +150,11 @@ func (r *ReconcilePerconaXtraDBClusterBackup) Reconcile(ctx context.Context, req return rr, nil } - if err := r.checkPassiveDeadline(ctx, cr); err != nil { - log.Info("Backup didn't start in passiveDeadlineSeconds, failing the backup", "passiveDeadlineSeconds", *cr.Spec.PassiveDeadlineSeconds) + passedSeconds, err := r.checkPassiveDeadline(ctx, cr) + if err != nil { + log.Info("Backup didn't start in passiveDeadlineSeconds, failing the backup", + "passiveDeadlineSeconds", *cr.Spec.PassiveDeadlineSeconds, + "passedSeconds", passedSeconds) cr.Status.State = api.BackupFailed cr.Status.Error = err.Error() @@ -194,14 +197,13 @@ func (r *ReconcilePerconaXtraDBClusterBackup) Reconcile(ctx context.Context, req return reconcile.Result{}, err } + if err := r.reconcileBackupJob(ctx, cr, cluster); err != nil { + return rr, errors.Wrap(err, "reconcile backup job") + } + if err := cluster.CanBackup(); err != nil { log.Info("Cluster is not ready for backup", "reason", err.Error()) - cr.Status.State = api.BackupWaiting - if err := r.updateStatus(ctx, cr); err != nil { - return rr, errors.Wrap(err, "update status") - } - return rr, nil } @@ -230,11 +232,6 @@ func (r *ReconcilePerconaXtraDBClusterBackup) Reconcile(ctx context.Context, req if lease.Spec.HolderIdentity != nil && *lease.Spec.HolderIdentity != cr.Name { log.Info("Another backup is holding the lock", "holder", *lease.Spec.HolderIdentity) - cr.Status.State = api.BackupWaiting - if err := r.updateStatus(ctx, cr); err != nil { - return rr, errors.Wrap(err, "update status") - } - return rr, nil } } @@ -678,20 +675,22 @@ func (r *ReconcilePerconaXtraDBClusterBackup) updateJobStatus( return nil } -func (r *ReconcilePerconaXtraDBClusterBackup) checkPassiveDeadline(ctx context.Context, cr *api.PerconaXtraDBClusterBackup) error { +func (r *ReconcilePerconaXtraDBClusterBackup) checkPassiveDeadline(ctx context.Context, cr *api.PerconaXtraDBClusterBackup) (float64, error) { + since := time.Since(cr.CreationTimestamp.Time).Seconds() + if cr.Spec.PassiveDeadlineSeconds == nil { - return nil + return since, nil } - if time.Since(cr.CreationTimestamp.Time).Seconds() < float64(*cr.Spec.PassiveDeadlineSeconds) { - return nil + if since < float64(*cr.Spec.PassiveDeadlineSeconds) { + return since, nil } switch cr.Status.State { - case api.BackupNew, api.BackupWaiting: - return errors.New("passive deadline seconds exceeded") + case api.BackupNew: + return since, errors.New("passive deadline seconds exceeded") default: - return nil + return since, nil } } @@ -708,3 +707,137 @@ func (r *ReconcilePerconaXtraDBClusterBackup) updateStatus(ctx context.Context, return r.client.Status().Update(ctx, localCr) }) } + +func (r *ReconcilePerconaXtraDBClusterBackup) suspendJobIfNeeded( + ctx context.Context, + cr *api.PerconaXtraDBClusterBackup, + cluster *api.PerconaXtraDBCluster, +) error { + if cluster.Spec.Unsafe.BackupIfUnhealthy { + return nil + } + + if cluster.Status.Status == api.AppStateReady { + return nil + } + + if cluster.Status.PXC.Ready == cluster.Status.PXC.Size { + return nil + } + + log := logf.FromContext(ctx) + + labelKeyBackupType := naming.GetLabelBackupType(cluster) + jobName := naming.BackupJobName(cr.Name, cr.Labels[labelKeyBackupType] == "cron") + + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + job := new(batchv1.Job) + + err := r.client.Get(ctx, types.NamespacedName{Namespace: cr.Namespace, Name: jobName}, job) + if err != nil { + if k8sErrors.IsNotFound(err) { + return nil + } + return err + } + + suspended := false + for _, cond := range job.Status.Conditions { + if cond.Type == batchv1.JobSuspended && cond.Status == corev1.ConditionTrue { + suspended = true + } + } + + if suspended { + return nil + } + + log.Info("Suspending backup job", + "job", jobName, + "clusterStatus", cluster.Status.Status, + "readyPXC", cluster.Status.PXC.Ready) + + t := true + job.Spec.Suspend = &t + + err = r.client.Update(ctx, job) + if err != nil { + return err + } + + cr.Status.State = api.BackupSuspended + return r.updateStatus(ctx, cr) + }) + + return err +} + +func (r *ReconcilePerconaXtraDBClusterBackup) resumeJobIfNeeded( + ctx context.Context, + cr *api.PerconaXtraDBClusterBackup, + cluster *api.PerconaXtraDBCluster, +) error { + if cluster.Status.Status != api.AppStateReady { + return nil + } + + if cluster.Status.PXC.Ready != cluster.Status.PXC.Size { + return nil + } + + log := logf.FromContext(ctx) + + labelKeyBackupType := naming.GetLabelBackupType(cluster) + jobName := naming.BackupJobName(cr.Name, cr.Labels[labelKeyBackupType] == "cron") + + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + job := new(batchv1.Job) + + err := r.client.Get(ctx, types.NamespacedName{Namespace: cr.Namespace, Name: jobName}, job) + if err != nil { + if k8sErrors.IsNotFound(err) { + return nil + } + return err + } + + suspended := false + for _, cond := range job.Status.Conditions { + if cond.Type == batchv1.JobSuspended && cond.Status == corev1.ConditionTrue { + suspended = true + } + } + + if !suspended { + return nil + } + + log.Info("Resuming backup job", + "job", jobName, + "clusterStatus", cluster.Status.Status, + "readyPXC", cluster.Status.PXC.Ready) + + f := false + job.Spec.Suspend = &f + + return r.client.Update(ctx, job) + }) + + return err +} + +func (r *ReconcilePerconaXtraDBClusterBackup) reconcileBackupJob( + ctx context.Context, + cr *api.PerconaXtraDBClusterBackup, + cluster *api.PerconaXtraDBCluster, +) error { + if err := r.suspendJobIfNeeded(ctx, cr, cluster); err != nil { + return errors.Wrap(err, "suspend job if needed") + } + + if err := r.resumeJobIfNeeded(ctx, cr, cluster); err != nil { + return errors.Wrap(err, "suspend job if needed") + } + + return nil +} diff --git a/pkg/naming/labels.go b/pkg/naming/labels.go index 8fe38bd66..4108a890f 100644 --- a/pkg/naming/labels.go +++ b/pkg/naming/labels.go @@ -30,6 +30,15 @@ const ( LabelPerconaRestoreJobName = perconaPrefix + "restore-job-name" ) +func GetLabelBackupType(cr *api.PerconaXtraDBCluster) string { + labelKeyBackupType := "type" + if cr.CompareVersionWith("1.16.0") >= 0 { + labelKeyBackupType = LabelPerconaBackupType + } + + return labelKeyBackupType +} + func LabelsCluster(cr *api.PerconaXtraDBCluster) map[string]string { return map[string]string{ LabelAppKubernetesName: "percona-xtradb-cluster", diff --git a/pkg/pxc/backup/job.go b/pkg/pxc/backup/job.go index 476156428..5da545083 100644 --- a/pkg/pxc/backup/job.go +++ b/pkg/pxc/backup/job.go @@ -19,11 +19,7 @@ import ( ) func (*Backup) Job(cr *api.PerconaXtraDBClusterBackup, cluster *api.PerconaXtraDBCluster) *batchv1.Job { - labelKeyBackupType := "type" - if cluster.CompareVersionWith("1.16.0") >= 0 { - labelKeyBackupType = naming.LabelPerconaBackupType - } - + labelKeyBackupType := naming.GetLabelBackupType(cluster) jobName := naming.BackupJobName(cr.Name, cr.Labels[labelKeyBackupType] == "cron") return &batchv1.Job{