Skip to content

Commit

Permalink
K8SPXC-1366: Suspend backup job if cluster becomes unready
Browse files Browse the repository at this point in the history
Backups can put pressure on a PXC cluster. With these changes, we
introduce a new safety mechanism to pause backups if cluster becomes
unhealthy. This mechanism can be disabled by enabling
`spec.unsafeFlags.backupIfUnhealthy`.

Operator will periodically check cluster status and ready PXC pods while
a backup is running. If ready PXC pods, at any point, becomes less than
the desired number of PXC pods, operator will suspend backup job.
Suspending the backup job will terminate any running backup pod. Operator
will automatically resume the job once ready PXC pods are equal to
desired number of PXC pods.
  • Loading branch information
egegunes committed Feb 5, 2025
1 parent ab6515c commit 5f98ca9
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 25 deletions.
2 changes: 1 addition & 1 deletion pkg/apis/pxc/v1/pxc_backup_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ type PXCBackupState string

const (
BackupNew PXCBackupState = ""
BackupWaiting PXCBackupState = "Waiting"
BackupSuspended PXCBackupState = "Suspended"
BackupStarting PXCBackupState = "Starting"
BackupRunning PXCBackupState = "Running"
BackupFailed PXCBackupState = "Failed"
Expand Down
171 changes: 152 additions & 19 deletions pkg/controller/pxcbackup/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,11 @@ func (r *ReconcilePerconaXtraDBClusterBackup) Reconcile(ctx context.Context, req
return rr, nil
}

if err := r.checkPassiveDeadline(ctx, cr); err != nil {
log.Info("Backup didn't start in passiveDeadlineSeconds, failing the backup", "passiveDeadlineSeconds", *cr.Spec.PassiveDeadlineSeconds)
passedSeconds, err := r.checkPassiveDeadline(ctx, cr)
if err != nil {
log.Info("Backup didn't start in passiveDeadlineSeconds, failing the backup",
"passiveDeadlineSeconds", *cr.Spec.PassiveDeadlineSeconds,
"passedSeconds", passedSeconds)

cr.Status.State = api.BackupFailed
cr.Status.Error = err.Error()
Expand Down Expand Up @@ -194,14 +197,13 @@ func (r *ReconcilePerconaXtraDBClusterBackup) Reconcile(ctx context.Context, req
return reconcile.Result{}, err
}

if err := r.reconcileBackupJob(ctx, cr, cluster); err != nil {
return rr, errors.Wrap(err, "reconcile backup job")
}

if err := cluster.CanBackup(); err != nil {
log.Info("Cluster is not ready for backup", "reason", err.Error())

cr.Status.State = api.BackupWaiting
if err := r.updateStatus(ctx, cr); err != nil {
return rr, errors.Wrap(err, "update status")
}

return rr, nil
}

Expand Down Expand Up @@ -230,11 +232,6 @@ func (r *ReconcilePerconaXtraDBClusterBackup) Reconcile(ctx context.Context, req
if lease.Spec.HolderIdentity != nil && *lease.Spec.HolderIdentity != cr.Name {
log.Info("Another backup is holding the lock", "holder", *lease.Spec.HolderIdentity)

cr.Status.State = api.BackupWaiting
if err := r.updateStatus(ctx, cr); err != nil {
return rr, errors.Wrap(err, "update status")
}

return rr, nil
}
}
Expand Down Expand Up @@ -678,20 +675,22 @@ func (r *ReconcilePerconaXtraDBClusterBackup) updateJobStatus(
return nil
}

func (r *ReconcilePerconaXtraDBClusterBackup) checkPassiveDeadline(ctx context.Context, cr *api.PerconaXtraDBClusterBackup) error {
func (r *ReconcilePerconaXtraDBClusterBackup) checkPassiveDeadline(ctx context.Context, cr *api.PerconaXtraDBClusterBackup) (float64, error) {
since := time.Since(cr.CreationTimestamp.Time).Seconds()

if cr.Spec.PassiveDeadlineSeconds == nil {
return nil
return since, nil
}

if time.Since(cr.CreationTimestamp.Time).Seconds() < float64(*cr.Spec.PassiveDeadlineSeconds) {
return nil
if since < float64(*cr.Spec.PassiveDeadlineSeconds) {
return since, nil
}

switch cr.Status.State {
case api.BackupNew, api.BackupWaiting:
return errors.New("passive deadline seconds exceeded")
case api.BackupNew:
return since, errors.New("passive deadline seconds exceeded")
default:
return nil
return since, nil
}
}

Expand All @@ -708,3 +707,137 @@ func (r *ReconcilePerconaXtraDBClusterBackup) updateStatus(ctx context.Context,
return r.client.Status().Update(ctx, localCr)
})
}

func (r *ReconcilePerconaXtraDBClusterBackup) suspendJobIfNeeded(
ctx context.Context,
cr *api.PerconaXtraDBClusterBackup,
cluster *api.PerconaXtraDBCluster,
) error {
if cluster.Spec.Unsafe.BackupIfUnhealthy {
return nil
}

if cluster.Status.Status == api.AppStateReady {
return nil
}

if cluster.Status.PXC.Ready == cluster.Status.PXC.Size {
return nil
}

log := logf.FromContext(ctx)

labelKeyBackupType := naming.GetLabelBackupType(cluster)
jobName := naming.BackupJobName(cr.Name, cr.Labels[labelKeyBackupType] == "cron")

err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
job := new(batchv1.Job)

err := r.client.Get(ctx, types.NamespacedName{Namespace: cr.Namespace, Name: jobName}, job)
if err != nil {
if k8sErrors.IsNotFound(err) {
return nil
}
return err
}

suspended := false
for _, cond := range job.Status.Conditions {
if cond.Type == batchv1.JobSuspended && cond.Status == corev1.ConditionTrue {
suspended = true
}
}

if suspended {
return nil
}

log.Info("Suspending backup job",
"job", jobName,
"clusterStatus", cluster.Status.Status,
"readyPXC", cluster.Status.PXC.Ready)

t := true
job.Spec.Suspend = &t

err = r.client.Update(ctx, job)
if err != nil {
return err
}

cr.Status.State = api.BackupSuspended
return r.updateStatus(ctx, cr)
})

return err
}

func (r *ReconcilePerconaXtraDBClusterBackup) resumeJobIfNeeded(
ctx context.Context,
cr *api.PerconaXtraDBClusterBackup,
cluster *api.PerconaXtraDBCluster,
) error {
if cluster.Status.Status != api.AppStateReady {
return nil
}

if cluster.Status.PXC.Ready != cluster.Status.PXC.Size {
return nil
}

log := logf.FromContext(ctx)

labelKeyBackupType := naming.GetLabelBackupType(cluster)
jobName := naming.BackupJobName(cr.Name, cr.Labels[labelKeyBackupType] == "cron")

err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
job := new(batchv1.Job)

err := r.client.Get(ctx, types.NamespacedName{Namespace: cr.Namespace, Name: jobName}, job)
if err != nil {
if k8sErrors.IsNotFound(err) {
return nil
}
return err
}

suspended := false
for _, cond := range job.Status.Conditions {
if cond.Type == batchv1.JobSuspended && cond.Status == corev1.ConditionTrue {
suspended = true
}
}

if !suspended {
return nil
}

log.Info("Resuming backup job",
"job", jobName,
"clusterStatus", cluster.Status.Status,
"readyPXC", cluster.Status.PXC.Ready)

f := false
job.Spec.Suspend = &f

return r.client.Update(ctx, job)
})

return err
}

func (r *ReconcilePerconaXtraDBClusterBackup) reconcileBackupJob(
ctx context.Context,
cr *api.PerconaXtraDBClusterBackup,
cluster *api.PerconaXtraDBCluster,
) error {
if err := r.suspendJobIfNeeded(ctx, cr, cluster); err != nil {
return errors.Wrap(err, "suspend job if needed")
}

if err := r.resumeJobIfNeeded(ctx, cr, cluster); err != nil {
return errors.Wrap(err, "suspend job if needed")
}

return nil
}
9 changes: 9 additions & 0 deletions pkg/naming/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ const (
LabelPerconaRestoreJobName = perconaPrefix + "restore-job-name"
)

func GetLabelBackupType(cr *api.PerconaXtraDBCluster) string {
labelKeyBackupType := "type"
if cr.CompareVersionWith("1.16.0") >= 0 {
labelKeyBackupType = LabelPerconaBackupType
}

return labelKeyBackupType
}

func LabelsCluster(cr *api.PerconaXtraDBCluster) map[string]string {
return map[string]string{
LabelAppKubernetesName: "percona-xtradb-cluster",
Expand Down
6 changes: 1 addition & 5 deletions pkg/pxc/backup/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@ import (
)

func (*Backup) Job(cr *api.PerconaXtraDBClusterBackup, cluster *api.PerconaXtraDBCluster) *batchv1.Job {
labelKeyBackupType := "type"
if cluster.CompareVersionWith("1.16.0") >= 0 {
labelKeyBackupType = naming.LabelPerconaBackupType
}

labelKeyBackupType := naming.GetLabelBackupType(cluster)
jobName := naming.BackupJobName(cr.Name, cr.Labels[labelKeyBackupType] == "cron")

return &batchv1.Job{
Expand Down

0 comments on commit 5f98ca9

Please sign in to comment.