From 025ae1ad8eed4b1497214c4e65a1eff61aeb3315 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ege=20G=C3=BCne=C5=9F?= Date: Fri, 7 Feb 2025 20:56:43 +0300 Subject: [PATCH 1/2] K8SPXC-1366: Add suspendedDeadlineSeconds suspendedDeadlineSeconds allows user to configure maximum duration that backup job can wait in suspended state. This field is optional and can be configured in two separate places: 1. PerconaXtraDBClusterBackup.spec.suspendedDeadlineSeconds: Always used if defined. 2. PerconaXtraDBCluster.spec.backup.suspendedDeadlineSeconds: Used only if not defined in PerconaXtraDBClusterBackup. Suspended deadlines seconds are checked against the last transition time of job status condidition with type JobSuspended and status true. Commit 5a077409 introduced startingDeadlineSeconds to fail backup it doesn't start before configured deadline. This commit also allows user to configure it globally, just like suspendedDeadlineSeconds. startingDeadlineSeconds is an optinal field and can be configured in two separate places: 1. PerconaXtraDBClusterBackup.spec.startedDeadlineSeconds: Always used if defined. 2. PerconaXtraDBCluster.spec.backup.startedDeadlineSeconds: Used only if not defined in PerconaXtraDBClusterBackup. --- ...rcona.com_perconaxtradbclusterbackups.yaml | 3 + ...pxc.percona.com_perconaxtradbclusters.yaml | 3 + deploy/backup/backup.yaml | 1 + deploy/bundle.yaml | 6 + deploy/cr.yaml | 1 + deploy/crd.yaml | 6 + deploy/cw-bundle.yaml | 6 + pkg/apis/pxc/v1/pxc_backup_types.go | 11 +- pkg/apis/pxc/v1/pxc_types.go | 25 +-- pkg/apis/pxc/v1/zz_generated.deepcopy.go | 14 +- pkg/controller/pxcbackup/controller.go | 161 ++++++++++++++---- 11 files changed, 187 insertions(+), 50 deletions(-) diff --git a/config/crd/bases/pxc.percona.com_perconaxtradbclusterbackups.yaml b/config/crd/bases/pxc.percona.com_perconaxtradbclusterbackups.yaml index 3696e7235..f2f3ce7b5 100644 --- a/config/crd/bases/pxc.percona.com_perconaxtradbclusterbackups.yaml +++ b/config/crd/bases/pxc.percona.com_perconaxtradbclusterbackups.yaml @@ -151,6 +151,9 @@ spec: type: integer storageName: type: string + suspendedDeadlineSeconds: + format: int64 + type: integer type: object status: properties: diff --git a/config/crd/bases/pxc.percona.com_perconaxtradbclusters.yaml b/config/crd/bases/pxc.percona.com_perconaxtradbclusters.yaml index 9a2ff3fd8..19775cf02 100644 --- a/config/crd/bases/pxc.percona.com_perconaxtradbclusters.yaml +++ b/config/crd/bases/pxc.percona.com_perconaxtradbclusters.yaml @@ -1083,6 +1083,9 @@ spec: type: object type: object type: object + suspendedDeadlineSeconds: + format: int64 + type: integer type: object crVersion: type: string diff --git a/deploy/backup/backup.yaml b/deploy/backup/backup.yaml index 26536b626..1b25015a9 100644 --- a/deploy/backup/backup.yaml +++ b/deploy/backup/backup.yaml @@ -9,6 +9,7 @@ spec: storageName: fs-pvc # activeDeadlineSeconds: 3600 # startingDeadlineSeconds: 300 +# suspendedDeadlineSeconds: 1200 # containerOptions: # env: # - name: VERIFY_TLS diff --git a/deploy/bundle.yaml b/deploy/bundle.yaml index 33bfe562d..48564eff7 100644 --- a/deploy/bundle.yaml +++ b/deploy/bundle.yaml @@ -150,6 +150,9 @@ spec: type: integer storageName: type: string + suspendedDeadlineSeconds: + format: int64 + type: integer type: object status: properties: @@ -1997,6 +2000,9 @@ spec: type: object type: object type: object + suspendedDeadlineSeconds: + format: int64 + type: integer type: object crVersion: type: string diff --git a/deploy/cr.yaml b/deploy/cr.yaml index 257f7fc4e..ea130fc8c 100644 --- a/deploy/cr.yaml +++ b/deploy/cr.yaml @@ -606,6 +606,7 @@ spec: # backoffLimit: 6 # activeDeadlineSeconds: 3600 # startingDeadlineSeconds: 300 +# suspendedDeadlineSeconds: 1200 # serviceAccountName: percona-xtradb-cluster-operator # imagePullSecrets: # - name: private-registry-credentials diff --git a/deploy/crd.yaml b/deploy/crd.yaml index 12b435fe6..ab0914d2e 100644 --- a/deploy/crd.yaml +++ b/deploy/crd.yaml @@ -150,6 +150,9 @@ spec: type: integer storageName: type: string + suspendedDeadlineSeconds: + format: int64 + type: integer type: object status: properties: @@ -1997,6 +2000,9 @@ spec: type: object type: object type: object + suspendedDeadlineSeconds: + format: int64 + type: integer type: object crVersion: type: string diff --git a/deploy/cw-bundle.yaml b/deploy/cw-bundle.yaml index 2ae2e70f0..e77a920de 100644 --- a/deploy/cw-bundle.yaml +++ b/deploy/cw-bundle.yaml @@ -150,6 +150,9 @@ spec: type: integer storageName: type: string + suspendedDeadlineSeconds: + format: int64 + type: integer type: object status: properties: @@ -1997,6 +2000,9 @@ spec: type: object type: object type: object + suspendedDeadlineSeconds: + format: int64 + type: integer type: object crVersion: type: string diff --git a/pkg/apis/pxc/v1/pxc_backup_types.go b/pkg/apis/pxc/v1/pxc_backup_types.go index 454dc1c0a..67b25917c 100644 --- a/pkg/apis/pxc/v1/pxc_backup_types.go +++ b/pkg/apis/pxc/v1/pxc_backup_types.go @@ -47,11 +47,12 @@ type PerconaXtraDBClusterBackup struct { } type PXCBackupSpec struct { - PXCCluster string `json:"pxcCluster"` - StorageName string `json:"storageName,omitempty"` - ContainerOptions *BackupContainerOptions `json:"containerOptions,omitempty"` - StartingDeadlineSeconds *int64 `json:"startingDeadlineSeconds,omitempty"` - ActiveDeadlineSeconds *int64 `json:"activeDeadlineSeconds,omitempty"` + PXCCluster string `json:"pxcCluster"` + StorageName string `json:"storageName,omitempty"` + ContainerOptions *BackupContainerOptions `json:"containerOptions,omitempty"` + ActiveDeadlineSeconds *int64 `json:"activeDeadlineSeconds,omitempty"` + StartingDeadlineSeconds *int64 `json:"startingDeadlineSeconds,omitempty"` + SuspendedDeadlineSeconds *int64 `json:"suspendedDeadlineSeconds,omitempty"` } type PXCBackupStatus struct { diff --git a/pkg/apis/pxc/v1/pxc_types.go b/pkg/apis/pxc/v1/pxc_types.go index 086495d12..4b2943eed 100644 --- a/pkg/apis/pxc/v1/pxc_types.go +++ b/pkg/apis/pxc/v1/pxc_types.go @@ -160,18 +160,19 @@ const ( ) type PXCScheduledBackup struct { - AllowParallel *bool `json:"allowParallel,omitempty"` - Image string `json:"image,omitempty"` - ImagePullSecrets []corev1.LocalObjectReference `json:"imagePullSecrets,omitempty"` - ImagePullPolicy corev1.PullPolicy `json:"imagePullPolicy,omitempty"` - Schedule []PXCScheduledBackupSchedule `json:"schedule,omitempty"` - Storages map[string]*BackupStorageSpec `json:"storages,omitempty"` - ServiceAccountName string `json:"serviceAccountName,omitempty"` - Annotations map[string]string `json:"annotations,omitempty"` - PITR PITRSpec `json:"pitr,omitempty"` - BackoffLimit *int32 `json:"backoffLimit,omitempty"` - ActiveDeadlineSeconds *int64 `json:"activeDeadlineSeconds,omitempty"` - StartingDeadlineSeconds *int64 `json:"startingDeadlineSeconds,omitempty"` + AllowParallel *bool `json:"allowParallel,omitempty"` + Image string `json:"image,omitempty"` + ImagePullSecrets []corev1.LocalObjectReference `json:"imagePullSecrets,omitempty"` + ImagePullPolicy corev1.PullPolicy `json:"imagePullPolicy,omitempty"` + Schedule []PXCScheduledBackupSchedule `json:"schedule,omitempty"` + Storages map[string]*BackupStorageSpec `json:"storages,omitempty"` + ServiceAccountName string `json:"serviceAccountName,omitempty"` + Annotations map[string]string `json:"annotations,omitempty"` + PITR PITRSpec `json:"pitr,omitempty"` + BackoffLimit *int32 `json:"backoffLimit,omitempty"` + ActiveDeadlineSeconds *int64 `json:"activeDeadlineSeconds,omitempty"` + StartingDeadlineSeconds *int64 `json:"startingDeadlineSeconds,omitempty"` + SuspendedDeadlineSeconds *int64 `json:"suspendedDeadlineSeconds,omitempty"` } func (b *PXCScheduledBackup) GetAllowParallel() bool { diff --git a/pkg/apis/pxc/v1/zz_generated.deepcopy.go b/pkg/apis/pxc/v1/zz_generated.deepcopy.go index e5d2c3bad..c30e554a4 100644 --- a/pkg/apis/pxc/v1/zz_generated.deepcopy.go +++ b/pkg/apis/pxc/v1/zz_generated.deepcopy.go @@ -401,13 +401,18 @@ func (in *PXCBackupSpec) DeepCopyInto(out *PXCBackupSpec) { *out = new(BackupContainerOptions) (*in).DeepCopyInto(*out) } + if in.ActiveDeadlineSeconds != nil { + in, out := &in.ActiveDeadlineSeconds, &out.ActiveDeadlineSeconds + *out = new(int64) + **out = **in + } if in.StartingDeadlineSeconds != nil { in, out := &in.StartingDeadlineSeconds, &out.StartingDeadlineSeconds *out = new(int64) **out = **in } - if in.ActiveDeadlineSeconds != nil { - in, out := &in.ActiveDeadlineSeconds, &out.ActiveDeadlineSeconds + if in.SuspendedDeadlineSeconds != nil { + in, out := &in.SuspendedDeadlineSeconds, &out.SuspendedDeadlineSeconds *out = new(int64) **out = **in } @@ -529,6 +534,11 @@ func (in *PXCScheduledBackup) DeepCopyInto(out *PXCScheduledBackup) { *out = new(int64) **out = **in } + if in.SuspendedDeadlineSeconds != nil { + in, out := &in.SuspendedDeadlineSeconds, &out.SuspendedDeadlineSeconds + *out = new(int64) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PXCScheduledBackup. diff --git a/pkg/controller/pxcbackup/controller.go b/pkg/controller/pxcbackup/controller.go index 5485da244..a3866fbc1 100644 --- a/pkg/controller/pxcbackup/controller.go +++ b/pkg/controller/pxcbackup/controller.go @@ -103,6 +103,11 @@ type ReconcilePerconaXtraDBClusterBackup struct { bcpDeleteInProgress *sync.Map } +var ( + errSuspendedDeadlineExceeded = errors.New("suspended deadline seconds exceeded") + errStartingDeadlineExceeded = errors.New("starting deadline seconds exceeded") +) + // Reconcile reads that state of the cluster for a PerconaXtraDBClusterBackup object and makes changes based on the state read // and what is in the PerconaXtraDBClusterBackup.Spec // Note: @@ -151,14 +156,6 @@ func (r *ReconcilePerconaXtraDBClusterBackup) Reconcile(ctx context.Context, req return rr, nil } - if err := r.checkStartingDeadline(ctx, cr); err != nil { - if err := r.setFailedStatus(ctx, cr, err); err != nil { - return rr, errors.Wrap(err, "update status") - } - - return reconcile.Result{}, nil - } - cluster, err := r.getCluster(ctx, cr) if err != nil { return reconcile.Result{}, errors.Wrap(err, "get cluster") @@ -187,6 +184,21 @@ func (r *ReconcilePerconaXtraDBClusterBackup) Reconcile(ctx context.Context, req return reconcile.Result{}, err } + if err := r.checkDeadlines(ctx, cluster, cr); err != nil { + if err := r.setFailedStatus(ctx, cr, err); err != nil { + return rr, errors.Wrap(err, "update status") + } + + if errors.Is(err, errSuspendedDeadlineExceeded) { + log.Info("cleaning up suspended backup job") + if err := r.cleanUpSuspendedJob(ctx, cluster, cr); err != nil { + return reconcile.Result{}, errors.Wrap(err, "clean up suspended job") + } + } + + return reconcile.Result{}, nil + } + if err := r.reconcileBackupJob(ctx, cr, cluster); err != nil { return rr, errors.Wrap(err, "reconcile backup job") } @@ -661,24 +673,86 @@ func (r *ReconcilePerconaXtraDBClusterBackup) updateJobStatus( return nil } -func (r *ReconcilePerconaXtraDBClusterBackup) checkStartingDeadline(ctx context.Context, cr *api.PerconaXtraDBClusterBackup) error { +func (r *ReconcilePerconaXtraDBClusterBackup) checkDeadlines(ctx context.Context, cluster *api.PerconaXtraDBCluster, cr *api.PerconaXtraDBClusterBackup) error { + if err := checkStartingDeadline(ctx, cluster, cr); err != nil { + return err + } + + if err := r.checkSuspendedDeadline(ctx, cluster, cr); err != nil { + return err + } + + return nil +} + +func checkStartingDeadline(ctx context.Context, cluster *api.PerconaXtraDBCluster, cr *api.PerconaXtraDBClusterBackup) error { log := logf.FromContext(ctx) - since := time.Since(cr.CreationTimestamp.Time).Seconds() + if cr.Status.State != api.BackupNew { + return nil + } + + var deadlineSeconds *int64 + if cr.Spec.StartingDeadlineSeconds != nil { + deadlineSeconds = cr.Spec.StartingDeadlineSeconds + } else if cluster.Spec.Backup.StartingDeadlineSeconds != nil { + deadlineSeconds = cluster.Spec.Backup.StartingDeadlineSeconds + } - if cr.Spec.StartingDeadlineSeconds == nil { + if deadlineSeconds == nil { return nil } - if since < float64(*cr.Spec.StartingDeadlineSeconds) { + since := time.Since(cr.CreationTimestamp.Time).Seconds() + if since < float64(*deadlineSeconds) { return nil } - if cr.Status.State == api.BackupNew { - log.Info("Backup didn't start in startingDeadlineSeconds, failing the backup", - "startingDeadlineSeconds", *cr.Spec.StartingDeadlineSeconds, - "passedSeconds", since) - return errors.New("starting deadline seconds exceeded") + log.Info("Backup didn't start in startingDeadlineSeconds, failing the backup", + "startingDeadlineSeconds", *deadlineSeconds, + "passedSeconds", since) + + return errStartingDeadlineExceeded +} + +func (r *ReconcilePerconaXtraDBClusterBackup) checkSuspendedDeadline( + ctx context.Context, + cluster *api.PerconaXtraDBCluster, + cr *api.PerconaXtraDBClusterBackup, +) error { + log := logf.FromContext(ctx) + + job, err := r.getBackupJob(ctx, cluster, cr) + if err != nil { + if k8sErrors.IsNotFound(err) { + return nil + } + + return err + } + + var deadlineSeconds *int64 + if cr.Spec.SuspendedDeadlineSeconds != nil { + deadlineSeconds = cr.Spec.SuspendedDeadlineSeconds + } else if cluster.Spec.Backup.SuspendedDeadlineSeconds != nil { + deadlineSeconds = cluster.Spec.Backup.SuspendedDeadlineSeconds + } + + if deadlineSeconds == nil { + return nil + } + + for _, cond := range job.Status.Conditions { + if cond.Type != batchv1.JobSuspended || cond.Status != corev1.ConditionTrue { + continue + } + + if since := time.Since(cond.LastTransitionTime.Time).Seconds(); since > float64(*deadlineSeconds) { + log.Info("Backup didn't resume in suspendedDeadlineSeconds, failing the backup", + "suspendedDeadlineSeconds", *deadlineSeconds, + "passedSeconds", since) + return errSuspendedDeadlineExceeded + } } return nil @@ -726,13 +800,8 @@ func (r *ReconcilePerconaXtraDBClusterBackup) suspendJobIfNeeded( log := logf.FromContext(ctx) - labelKeyBackupType := naming.GetLabelBackupType(cluster) - jobName := naming.BackupJobName(cr.Name, cr.Labels[labelKeyBackupType] == "cron") - err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - job := new(batchv1.Job) - - err := r.client.Get(ctx, types.NamespacedName{Namespace: cr.Namespace, Name: jobName}, job) + job, err := r.getBackupJob(ctx, cluster, cr) if err != nil { if k8sErrors.IsNotFound(err) { return nil @@ -752,7 +821,7 @@ func (r *ReconcilePerconaXtraDBClusterBackup) suspendJobIfNeeded( } log.Info("Suspending backup job", - "job", jobName, + "job", job.Name, "clusterStatus", cluster.Status.Status, "readyPXC", cluster.Status.PXC.Ready) @@ -785,13 +854,8 @@ func (r *ReconcilePerconaXtraDBClusterBackup) resumeJobIfNeeded( log := logf.FromContext(ctx) - labelKeyBackupType := naming.GetLabelBackupType(cluster) - jobName := naming.BackupJobName(cr.Name, cr.Labels[labelKeyBackupType] == "cron") - err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - job := new(batchv1.Job) - - err := r.client.Get(ctx, types.NamespacedName{Namespace: cr.Namespace, Name: jobName}, job) + job, err := r.getBackupJob(ctx, cluster, cr) if err != nil { if k8sErrors.IsNotFound(err) { return nil @@ -811,7 +875,7 @@ func (r *ReconcilePerconaXtraDBClusterBackup) resumeJobIfNeeded( } log.Info("Resuming backup job", - "job", jobName, + "job", job.Name, "clusterStatus", cluster.Status.Status, "readyPXC", cluster.Status.PXC.Ready) @@ -838,3 +902,38 @@ func (r *ReconcilePerconaXtraDBClusterBackup) reconcileBackupJob( return nil } + +func (r *ReconcilePerconaXtraDBClusterBackup) getBackupJob( + ctx context.Context, + cluster *api.PerconaXtraDBCluster, + cr *api.PerconaXtraDBClusterBackup, +) (*batchv1.Job, error) { + labelKeyBackupType := naming.GetLabelBackupType(cluster) + jobName := naming.BackupJobName(cr.Name, cr.Labels[labelKeyBackupType] == "cron") + + job := new(batchv1.Job) + + err := r.client.Get(ctx, types.NamespacedName{Namespace: cr.Namespace, Name: jobName}, job) + if err != nil { + return nil, err + } + + return job, nil +} + +func (r *ReconcilePerconaXtraDBClusterBackup) cleanUpSuspendedJob( + ctx context.Context, + cluster *api.PerconaXtraDBCluster, + cr *api.PerconaXtraDBClusterBackup, +) error { + job, err := r.getBackupJob(ctx, cluster, cr) + if err != nil { + return errors.Wrap(err, "get job") + } + + if err := r.client.Delete(ctx, job); err != nil { + return errors.Wrap(err, "delete job") + } + + return nil +} From 8202127f77bfd03d164c262d85903181b813200e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ege=20G=C3=BCne=C5=9F?= Date: Mon, 10 Feb 2025 14:23:17 +0300 Subject: [PATCH 2/2] K8SPXC-1366: Add tests for deadlines --- pkg/controller/pxcbackup/controller.go | 90 -------- pkg/controller/pxcbackup/deadline.go | 104 +++++++++ pkg/controller/pxcbackup/deadline_test.go | 249 ++++++++++++++++++++++ pkg/controller/pxcbackup/suite_test.go | 99 +++++++++ 4 files changed, 452 insertions(+), 90 deletions(-) create mode 100644 pkg/controller/pxcbackup/deadline.go create mode 100644 pkg/controller/pxcbackup/deadline_test.go create mode 100644 pkg/controller/pxcbackup/suite_test.go diff --git a/pkg/controller/pxcbackup/controller.go b/pkg/controller/pxcbackup/controller.go index a3866fbc1..806aebe81 100644 --- a/pkg/controller/pxcbackup/controller.go +++ b/pkg/controller/pxcbackup/controller.go @@ -103,11 +103,6 @@ type ReconcilePerconaXtraDBClusterBackup struct { bcpDeleteInProgress *sync.Map } -var ( - errSuspendedDeadlineExceeded = errors.New("suspended deadline seconds exceeded") - errStartingDeadlineExceeded = errors.New("starting deadline seconds exceeded") -) - // Reconcile reads that state of the cluster for a PerconaXtraDBClusterBackup object and makes changes based on the state read // and what is in the PerconaXtraDBClusterBackup.Spec // Note: @@ -673,91 +668,6 @@ func (r *ReconcilePerconaXtraDBClusterBackup) updateJobStatus( return nil } -func (r *ReconcilePerconaXtraDBClusterBackup) checkDeadlines(ctx context.Context, cluster *api.PerconaXtraDBCluster, cr *api.PerconaXtraDBClusterBackup) error { - if err := checkStartingDeadline(ctx, cluster, cr); err != nil { - return err - } - - if err := r.checkSuspendedDeadline(ctx, cluster, cr); err != nil { - return err - } - - return nil -} - -func checkStartingDeadline(ctx context.Context, cluster *api.PerconaXtraDBCluster, cr *api.PerconaXtraDBClusterBackup) error { - log := logf.FromContext(ctx) - - if cr.Status.State != api.BackupNew { - return nil - } - - var deadlineSeconds *int64 - if cr.Spec.StartingDeadlineSeconds != nil { - deadlineSeconds = cr.Spec.StartingDeadlineSeconds - } else if cluster.Spec.Backup.StartingDeadlineSeconds != nil { - deadlineSeconds = cluster.Spec.Backup.StartingDeadlineSeconds - } - - if deadlineSeconds == nil { - return nil - } - - since := time.Since(cr.CreationTimestamp.Time).Seconds() - if since < float64(*deadlineSeconds) { - return nil - } - - log.Info("Backup didn't start in startingDeadlineSeconds, failing the backup", - "startingDeadlineSeconds", *deadlineSeconds, - "passedSeconds", since) - - return errStartingDeadlineExceeded -} - -func (r *ReconcilePerconaXtraDBClusterBackup) checkSuspendedDeadline( - ctx context.Context, - cluster *api.PerconaXtraDBCluster, - cr *api.PerconaXtraDBClusterBackup, -) error { - log := logf.FromContext(ctx) - - job, err := r.getBackupJob(ctx, cluster, cr) - if err != nil { - if k8sErrors.IsNotFound(err) { - return nil - } - - return err - } - - var deadlineSeconds *int64 - if cr.Spec.SuspendedDeadlineSeconds != nil { - deadlineSeconds = cr.Spec.SuspendedDeadlineSeconds - } else if cluster.Spec.Backup.SuspendedDeadlineSeconds != nil { - deadlineSeconds = cluster.Spec.Backup.SuspendedDeadlineSeconds - } - - if deadlineSeconds == nil { - return nil - } - - for _, cond := range job.Status.Conditions { - if cond.Type != batchv1.JobSuspended || cond.Status != corev1.ConditionTrue { - continue - } - - if since := time.Since(cond.LastTransitionTime.Time).Seconds(); since > float64(*deadlineSeconds) { - log.Info("Backup didn't resume in suspendedDeadlineSeconds, failing the backup", - "suspendedDeadlineSeconds", *deadlineSeconds, - "passedSeconds", since) - return errSuspendedDeadlineExceeded - } - } - - return nil -} - func (r *ReconcilePerconaXtraDBClusterBackup) updateStatus(ctx context.Context, cr *api.PerconaXtraDBClusterBackup) error { return retry.RetryOnConflict(retry.DefaultRetry, func() error { localCr := new(api.PerconaXtraDBClusterBackup) diff --git a/pkg/controller/pxcbackup/deadline.go b/pkg/controller/pxcbackup/deadline.go new file mode 100644 index 000000000..dc1e63f2d --- /dev/null +++ b/pkg/controller/pxcbackup/deadline.go @@ -0,0 +1,104 @@ +package pxcbackup + +import ( + "context" + "time" + + "github.com/pkg/errors" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + k8sErrors "k8s.io/apimachinery/pkg/api/errors" + logf "sigs.k8s.io/controller-runtime/pkg/log" + + api "github.com/percona/percona-xtradb-cluster-operator/pkg/apis/pxc/v1" +) + +var ( + errSuspendedDeadlineExceeded = errors.New("suspended deadline seconds exceeded") + errStartingDeadlineExceeded = errors.New("starting deadline seconds exceeded") +) + +func (r *ReconcilePerconaXtraDBClusterBackup) checkDeadlines(ctx context.Context, cluster *api.PerconaXtraDBCluster, cr *api.PerconaXtraDBClusterBackup) error { + if err := checkStartingDeadline(ctx, cluster, cr); err != nil { + return err + } + + if err := r.checkSuspendedDeadline(ctx, cluster, cr); err != nil { + return err + } + + return nil +} + +func checkStartingDeadline(ctx context.Context, cluster *api.PerconaXtraDBCluster, cr *api.PerconaXtraDBClusterBackup) error { + log := logf.FromContext(ctx) + + if cr.Status.State != api.BackupNew { + return nil + } + + var deadlineSeconds *int64 + if cr.Spec.StartingDeadlineSeconds != nil { + deadlineSeconds = cr.Spec.StartingDeadlineSeconds + } else if cluster.Spec.Backup.StartingDeadlineSeconds != nil { + deadlineSeconds = cluster.Spec.Backup.StartingDeadlineSeconds + } + + if deadlineSeconds == nil { + return nil + } + + since := time.Since(cr.CreationTimestamp.Time).Seconds() + if since < float64(*deadlineSeconds) { + return nil + } + + log.Info("Backup didn't start in startingDeadlineSeconds, failing the backup", + "startingDeadlineSeconds", *deadlineSeconds, + "passedSeconds", since) + + return errStartingDeadlineExceeded +} + +func (r *ReconcilePerconaXtraDBClusterBackup) checkSuspendedDeadline( + ctx context.Context, + cluster *api.PerconaXtraDBCluster, + cr *api.PerconaXtraDBClusterBackup, +) error { + log := logf.FromContext(ctx) + + job, err := r.getBackupJob(ctx, cluster, cr) + if err != nil { + if k8sErrors.IsNotFound(err) { + return nil + } + + return err + } + + var deadlineSeconds *int64 + if cr.Spec.SuspendedDeadlineSeconds != nil { + deadlineSeconds = cr.Spec.SuspendedDeadlineSeconds + } else if cluster.Spec.Backup.SuspendedDeadlineSeconds != nil { + deadlineSeconds = cluster.Spec.Backup.SuspendedDeadlineSeconds + } + + if deadlineSeconds == nil { + return nil + } + + for _, cond := range job.Status.Conditions { + if cond.Type != batchv1.JobSuspended || cond.Status != corev1.ConditionTrue { + continue + } + + if since := time.Since(cond.LastTransitionTime.Time).Seconds(); since > float64(*deadlineSeconds) { + log.Info("Backup didn't resume in suspendedDeadlineSeconds, failing the backup", + "suspendedDeadlineSeconds", *deadlineSeconds, + "passedSeconds", since) + return errSuspendedDeadlineExceeded + } + } + + return nil +} diff --git a/pkg/controller/pxcbackup/deadline_test.go b/pkg/controller/pxcbackup/deadline_test.go new file mode 100644 index 000000000..9d5f29c1e --- /dev/null +++ b/pkg/controller/pxcbackup/deadline_test.go @@ -0,0 +1,249 @@ +package pxcbackup + +import ( + "context" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" + + pxcv1 "github.com/percona/percona-xtradb-cluster-operator/pkg/apis/pxc/v1" + "github.com/percona/percona-xtradb-cluster-operator/pkg/pxc/backup" +) + +var _ = Describe("Starting deadline", func() { + It("should be optional", func() { + cluster, err := readDefaultCR("cluster1", "test") + Expect(err).ToNot(HaveOccurred()) + + bcp, err := readDefaultBackup("backup1", "test") + Expect(err).ToNot(HaveOccurred()) + + cluster.Spec.Backup.StartingDeadlineSeconds = nil + + bcp.Spec.StartingDeadlineSeconds = nil + bcp.Status.State = pxcv1.BackupNew + + err = checkStartingDeadline(context.Background(), cluster, bcp) + Expect(err).ToNot(HaveOccurred()) + }) + + It("should use universal value if defined", func() { + cluster, err := readDefaultCR("cluster1", "test") + Expect(err).ToNot(HaveOccurred()) + + bcp, err := readDefaultBackup("backup1", "test") + Expect(err).ToNot(HaveOccurred()) + + cluster.Spec.Backup.StartingDeadlineSeconds = ptr.To(int64(60)) + + bcp.Status.State = pxcv1.BackupNew + bcp.ObjectMeta.CreationTimestamp = metav1.NewTime(time.Now().Add(-2 * time.Minute)) + + err = checkStartingDeadline(context.Background(), cluster, bcp) + Expect(err).To(HaveOccurred()) + }) + + It("should use particular value if defined", func() { + cluster, err := readDefaultCR("cluster1", "test") + Expect(err).ToNot(HaveOccurred()) + + bcp, err := readDefaultBackup("backup1", "test") + Expect(err).ToNot(HaveOccurred()) + + cluster.Spec.Backup.StartingDeadlineSeconds = ptr.To(int64(600)) + + bcp.Status.State = pxcv1.BackupNew + bcp.ObjectMeta.CreationTimestamp = metav1.NewTime(time.Now().Add(-2 * time.Minute)) + bcp.Spec.StartingDeadlineSeconds = ptr.To(int64(60)) + + err = checkStartingDeadline(context.Background(), cluster, bcp) + Expect(err).To(HaveOccurred()) + }) + + It("should not return an error", func() { + cluster, err := readDefaultCR("cluster1", "test") + Expect(err).ToNot(HaveOccurred()) + + bcp, err := readDefaultBackup("backup1", "test") + Expect(err).ToNot(HaveOccurred()) + + cluster.Spec.Backup.StartingDeadlineSeconds = ptr.To(int64(600)) + + bcp.Status.State = pxcv1.BackupNew + bcp.ObjectMeta.CreationTimestamp = metav1.NewTime(time.Now().Add(-2 * time.Minute)) + bcp.Spec.StartingDeadlineSeconds = ptr.To(int64(300)) + + err = checkStartingDeadline(context.Background(), cluster, bcp) + Expect(err).ToNot(HaveOccurred()) + }) +}) + +var _ = Describe("Suspended deadline", func() { + It("should do an early return without a job", func() { + r := reconciler(buildFakeClient()) + + cluster, err := readDefaultCR("cluster1", "test") + Expect(err).ToNot(HaveOccurred()) + + bcp, err := readDefaultBackup("backup1", "test") + Expect(err).ToNot(HaveOccurred()) + + err = r.checkSuspendedDeadline(context.Background(), cluster, bcp) + Expect(err).ToNot(HaveOccurred()) + }) + + It("should be optional", func() { + cluster, err := readDefaultCR("cluster1", "test") + Expect(err).ToNot(HaveOccurred()) + + cr, err := readDefaultBackup("backup1", "test") + Expect(err).ToNot(HaveOccurred()) + + bcp := backup.New(cluster) + job := bcp.Job(cr, cluster) + + job.Spec, err = bcp.JobSpec(cr.Spec, cluster, job, "") + Expect(err).ToNot(HaveOccurred()) + + job.Status.Conditions = append(job.Status.Conditions, batchv1.JobCondition{ + Type: batchv1.JobSuspended, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.NewTime(time.Now().Add(-2 * time.Minute)), + }) + + r := reconciler(buildFakeClient(job)) + + cluster.Spec.Backup.SuspendedDeadlineSeconds = nil + cr.Spec.SuspendedDeadlineSeconds = nil + + err = r.checkSuspendedDeadline(context.Background(), cluster, cr) + Expect(err).ToNot(HaveOccurred()) + }) + + It("should use universal value if defined", func() { + cluster, err := readDefaultCR("cluster1", "test") + Expect(err).ToNot(HaveOccurred()) + + cr, err := readDefaultBackup("backup1", "test") + Expect(err).ToNot(HaveOccurred()) + + bcp := backup.New(cluster) + job := bcp.Job(cr, cluster) + + job.Spec, err = bcp.JobSpec(cr.Spec, cluster, job, "") + Expect(err).ToNot(HaveOccurred()) + + job.Status.Conditions = append(job.Status.Conditions, batchv1.JobCondition{ + Type: batchv1.JobSuspended, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.NewTime(time.Now().Add(-2 * time.Minute)), + }) + + r := reconciler(buildFakeClient(job)) + + cluster.Spec.Backup.SuspendedDeadlineSeconds = ptr.To(int64(60)) + cr.Spec.SuspendedDeadlineSeconds = nil + + err = r.checkSuspendedDeadline(context.Background(), cluster, cr) + Expect(err).To(HaveOccurred()) + }) + + It("should use particular value if defined", func() { + cluster, err := readDefaultCR("cluster1", "test") + Expect(err).ToNot(HaveOccurred()) + + cr, err := readDefaultBackup("backup1", "test") + Expect(err).ToNot(HaveOccurred()) + + bcp := backup.New(cluster) + job := bcp.Job(cr, cluster) + + job.Spec, err = bcp.JobSpec(cr.Spec, cluster, job, "") + Expect(err).ToNot(HaveOccurred()) + + job.Status.Conditions = append(job.Status.Conditions, batchv1.JobCondition{ + Type: batchv1.JobSuspended, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.NewTime(time.Now().Add(-2 * time.Minute)), + }) + + r := reconciler(buildFakeClient(job)) + + cluster.Spec.Backup.SuspendedDeadlineSeconds = ptr.To(int64(600)) + cr.Spec.SuspendedDeadlineSeconds = ptr.To(int64(60)) + + err = r.checkSuspendedDeadline(context.Background(), cluster, cr) + Expect(err).To(HaveOccurred()) + }) + + It("should clean up suspended job", func() { + cluster, err := readDefaultCR("cluster1", "test") + Expect(err).ToNot(HaveOccurred()) + + cr, err := readDefaultBackup("backup1", "test") + Expect(err).ToNot(HaveOccurred()) + + bcp := backup.New(cluster) + job := bcp.Job(cr, cluster) + + job.Spec, err = bcp.JobSpec(cr.Spec, cluster, job, "") + Expect(err).ToNot(HaveOccurred()) + + job.Status.Conditions = append(job.Status.Conditions, batchv1.JobCondition{ + Type: batchv1.JobSuspended, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.NewTime(time.Now().Add(-2 * time.Minute)), + }) + + cl := buildFakeClient(job) + r := reconciler(cl) + + cr.Spec.SuspendedDeadlineSeconds = ptr.To(int64(60)) + + err = r.checkSuspendedDeadline(context.Background(), cluster, cr) + Expect(err).To(HaveOccurred()) + + err = r.cleanUpSuspendedJob(context.Background(), cluster, cr) + Expect(err).NotTo(HaveOccurred()) + + j := new(batchv1.Job) + err = cl.Get(context.Background(), client.ObjectKeyFromObject(job), j) + Expect(err).To(HaveOccurred()) + Expect(k8serrors.IsNotFound(err)).To(BeTrue()) + }) + + It("should not return an error", func() { + cluster, err := readDefaultCR("cluster1", "test") + Expect(err).ToNot(HaveOccurred()) + + cr, err := readDefaultBackup("backup1", "test") + Expect(err).ToNot(HaveOccurred()) + + bcp := backup.New(cluster) + job := bcp.Job(cr, cluster) + + job.Spec, err = bcp.JobSpec(cr.Spec, cluster, job, "") + Expect(err).ToNot(HaveOccurred()) + + job.Status.Conditions = append(job.Status.Conditions, batchv1.JobCondition{ + Type: batchv1.JobSuspended, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.NewTime(time.Now().Add(-2 * time.Minute)), + }) + + r := reconciler(buildFakeClient(job)) + + cluster.Spec.Backup.SuspendedDeadlineSeconds = ptr.To(int64(600)) + cr.Spec.SuspendedDeadlineSeconds = ptr.To(int64(300)) + + err = r.checkSuspendedDeadline(context.Background(), cluster, cr) + Expect(err).ToNot(HaveOccurred()) + }) +}) diff --git a/pkg/controller/pxcbackup/suite_test.go b/pkg/controller/pxcbackup/suite_test.go new file mode 100644 index 000000000..7ae10e616 --- /dev/null +++ b/pkg/controller/pxcbackup/suite_test.go @@ -0,0 +1,99 @@ +package pxcbackup + +import ( + "context" + "os" + "path/filepath" + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/yaml" + k8sversion "k8s.io/apimachinery/pkg/version" + "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" // nolint + logf "sigs.k8s.io/controller-runtime/pkg/log" + + pxcv1 "github.com/percona/percona-xtradb-cluster-operator/pkg/apis/pxc/v1" + "github.com/percona/percona-xtradb-cluster-operator/version" +) + +func TestPxcbackup(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "PerconaXtraDBClusterBackup Suite") +} + +func readDefaultCR(name, namespace string) (*pxcv1.PerconaXtraDBCluster, error) { + data, err := os.ReadFile(filepath.Join("..", "..", "..", "deploy", "cr.yaml")) + if err != nil { + return nil, err + } + + cr := &pxcv1.PerconaXtraDBCluster{} + + if err := yaml.Unmarshal(data, cr); err != nil { + return cr, err + } + + cr.Name = name + cr.Namespace = namespace + cr.Spec.InitImage = "perconalab/percona-xtradb-cluster-operator:main" + b := false + cr.Spec.PXC.AutoRecovery = &b + + v := version.ServerVersion{ + Platform: version.PlatformKubernetes, + Info: k8sversion.Info{}, + } + + log := logf.FromContext(context.Background()) + if err := cr.CheckNSetDefaults(&v, log); err != nil { + return cr, err + } + + return cr, nil +} + +func readDefaultBackup(name, namespace string) (*pxcv1.PerconaXtraDBClusterBackup, error) { + data, err := os.ReadFile(filepath.Join("..", "..", "..", "deploy", "backup", "backup.yaml")) + if err != nil { + return nil, err + } + + cr := &pxcv1.PerconaXtraDBClusterBackup{} + + if err := yaml.Unmarshal(data, cr); err != nil { + return cr, err + } + + cr.Name = name + cr.Namespace = namespace + + return cr, nil +} + +func reconciler(cl client.Client) *ReconcilePerconaXtraDBClusterBackup { + return &ReconcilePerconaXtraDBClusterBackup{ + client: cl, + scheme: cl.Scheme(), + } +} + +// buildFakeClient creates a fake client to mock API calls with the mock objects +func buildFakeClient(objs ...runtime.Object) client.Client { + s := scheme.Scheme + + s.AddKnownTypes(pxcv1.SchemeGroupVersion, new(pxcv1.PerconaXtraDBClusterRestore)) + s.AddKnownTypes(pxcv1.SchemeGroupVersion, new(pxcv1.PerconaXtraDBClusterBackup)) + s.AddKnownTypes(pxcv1.SchemeGroupVersion, new(pxcv1.PerconaXtraDBCluster)) + + cl := fake.NewClientBuilder(). + WithScheme(s). + WithRuntimeObjects(objs...). + WithStatusSubresource(&pxcv1.PerconaXtraDBClusterRestore{}). + Build() + + return cl +}