diff --git a/config/crd/bases/pxc.percona.com_perconaxtradbclusterbackups.yaml b/config/crd/bases/pxc.percona.com_perconaxtradbclusterbackups.yaml index a8e745cb8..3696e7235 100644 --- a/config/crd/bases/pxc.percona.com_perconaxtradbclusterbackups.yaml +++ b/config/crd/bases/pxc.percona.com_perconaxtradbclusterbackups.yaml @@ -146,6 +146,9 @@ spec: type: object pxcCluster: type: string + startingDeadlineSeconds: + format: int64 + type: integer storageName: type: string type: object @@ -203,6 +206,8 @@ spec: type: array destination: type: string + error: + type: string image: type: string lastscheduled: diff --git a/config/crd/bases/pxc.percona.com_perconaxtradbclusterrestores.yaml b/config/crd/bases/pxc.percona.com_perconaxtradbclusterrestores.yaml index 50e42bb77..1a964ea17 100644 --- a/config/crd/bases/pxc.percona.com_perconaxtradbclusterrestores.yaml +++ b/config/crd/bases/pxc.percona.com_perconaxtradbclusterrestores.yaml @@ -101,6 +101,8 @@ spec: type: array destination: type: string + error: + type: string image: type: string lastscheduled: @@ -275,6 +277,8 @@ spec: type: array destination: type: string + error: + type: string image: type: string lastscheduled: diff --git a/config/crd/bases/pxc.percona.com_perconaxtradbclusters.yaml b/config/crd/bases/pxc.percona.com_perconaxtradbclusters.yaml index f79927e7e..9a2ff3fd8 100644 --- a/config/crd/bases/pxc.percona.com_perconaxtradbclusters.yaml +++ b/config/crd/bases/pxc.percona.com_perconaxtradbclusters.yaml @@ -143,6 +143,9 @@ spec: type: array serviceAccountName: type: string + startingDeadlineSeconds: + format: int64 + type: integer storages: additionalProperties: properties: diff --git a/deploy/backup/backup.yaml b/deploy/backup/backup.yaml index 2a65ed2ce..26536b626 100644 --- a/deploy/backup/backup.yaml +++ b/deploy/backup/backup.yaml @@ -8,6 +8,7 @@ spec: pxcCluster: cluster1 storageName: fs-pvc # activeDeadlineSeconds: 3600 +# startingDeadlineSeconds: 300 # containerOptions: # env: # - name: VERIFY_TLS diff --git a/deploy/bundle.yaml b/deploy/bundle.yaml index 4604da4fa..33bfe562d 100644 --- a/deploy/bundle.yaml +++ b/deploy/bundle.yaml @@ -145,6 +145,9 @@ spec: type: object pxcCluster: type: string + startingDeadlineSeconds: + format: int64 + type: integer storageName: type: string type: object @@ -202,6 +205,8 @@ spec: type: array destination: type: string + error: + type: string image: type: string lastscheduled: @@ -344,6 +349,8 @@ spec: type: array destination: type: string + error: + type: string image: type: string lastscheduled: @@ -518,6 +525,8 @@ spec: type: array destination: type: string + error: + type: string image: type: string lastscheduled: @@ -1048,6 +1057,9 @@ spec: type: array serviceAccountName: type: string + startingDeadlineSeconds: + format: int64 + type: integer storages: additionalProperties: properties: diff --git a/deploy/cr.yaml b/deploy/cr.yaml index 152620d81..257f7fc4e 100644 --- a/deploy/cr.yaml +++ b/deploy/cr.yaml @@ -605,6 +605,7 @@ spec: image: perconalab/percona-xtradb-cluster-operator:main-pxc8.0-backup # backoffLimit: 6 # activeDeadlineSeconds: 3600 +# startingDeadlineSeconds: 300 # serviceAccountName: percona-xtradb-cluster-operator # imagePullSecrets: # - name: private-registry-credentials diff --git a/deploy/crd.yaml b/deploy/crd.yaml index 63c7dcd41..12b435fe6 100644 --- a/deploy/crd.yaml +++ b/deploy/crd.yaml @@ -145,6 +145,9 @@ spec: type: object pxcCluster: type: string + startingDeadlineSeconds: + format: int64 + type: integer storageName: type: string type: object @@ -202,6 +205,8 @@ spec: type: array destination: type: string + error: + type: string image: type: string lastscheduled: @@ -344,6 +349,8 @@ spec: type: array destination: type: string + error: + type: string image: type: string lastscheduled: @@ -518,6 +525,8 @@ spec: type: array destination: type: string + error: + type: string image: type: string lastscheduled: @@ -1048,6 +1057,9 @@ spec: type: array serviceAccountName: type: string + startingDeadlineSeconds: + format: int64 + type: integer storages: additionalProperties: properties: diff --git a/deploy/cw-bundle.yaml b/deploy/cw-bundle.yaml index f536667bd..2ae2e70f0 100644 --- a/deploy/cw-bundle.yaml +++ b/deploy/cw-bundle.yaml @@ -145,6 +145,9 @@ spec: type: object pxcCluster: type: string + startingDeadlineSeconds: + format: int64 + type: integer storageName: type: string type: object @@ -202,6 +205,8 @@ spec: type: array destination: type: string + error: + type: string image: type: string lastscheduled: @@ -344,6 +349,8 @@ spec: type: array destination: type: string + error: + type: string image: type: string lastscheduled: @@ -518,6 +525,8 @@ spec: type: array destination: type: string + error: + type: string image: type: string lastscheduled: @@ -1048,6 +1057,9 @@ spec: type: array serviceAccountName: type: string + startingDeadlineSeconds: + format: int64 + type: integer storages: additionalProperties: properties: diff --git a/e2e-tests/demand-backup-parallel/conf/backup.yml b/e2e-tests/demand-backup-parallel/conf/backup.yml new file mode 100644 index 000000000..17d3cac81 --- /dev/null +++ b/e2e-tests/demand-backup-parallel/conf/backup.yml @@ -0,0 +1,7 @@ +apiVersion: pxc.percona.com/v1 +kind: PerconaXtraDBClusterBackup +metadata: + name: +spec: + pxcCluster: demand-backup-parallel + storageName: minio diff --git a/e2e-tests/demand-backup-parallel/conf/cr.yml b/e2e-tests/demand-backup-parallel/conf/cr.yml new file mode 100644 index 000000000..677b9936f --- /dev/null +++ b/e2e-tests/demand-backup-parallel/conf/cr.yml @@ -0,0 +1,88 @@ +apiVersion: pxc.percona.com/v1 +kind: PerconaXtraDBCluster +metadata: + name: demand-backup-parallel + finalizers: + - percona.com/delete-pxc-pods-in-order + # annotations: + # percona.com/issue-vault-token: "true" +spec: + tls: + SANs: + - "minio-service.#namespace" + secretsName: my-cluster-secrets + vaultSecretName: some-name-vault + pause: false + pxc: + size: 3 + image: -pxc + configuration: | + [mysqld] + wsrep_log_conflicts + log_error_verbosity=3 + wsrep_debug=1 + [sst] + xbstream-opts=--decompress + [xtrabackup] + compress=lz4 + resources: + requests: + memory: 0.1G + cpu: 100m + limits: + memory: "2G" + cpu: "1" + volumeSpec: + persistentVolumeClaim: + resources: + requests: + storage: 2Gi + affinity: + antiAffinityTopologyKey: "kubernetes.io/hostname" + haproxy: + enabled: true + size: 2 + image: -haproxy + resources: + requests: + memory: 0.1G + cpu: 100m + limits: + memory: 1G + cpu: 700m + affinity: + antiAffinityTopologyKey: "kubernetes.io/hostname" + pmm: + enabled: false + image: perconalab/pmm-client:1.17.1 + serverHost: monitoring-service + serverUser: pmm + backup: + activeDeadlineSeconds: 3600 + allowParallel: false + backoffLimit: 3 + image: -backup + storages: + pvc: + type: filesystem + volume: + persistentVolumeClaim: + accessModes: [ "ReadWriteOnce" ] + resources: + requests: + storage: 1Gi + minio: + type: s3 + resources: + requests: + memory: 0.5G + cpu: 500m + limits: + memory: "2G" + cpu: "1" + s3: + credentialsSecret: minio-secret + region: us-east-1 + bucket: operator-testing/prefix/subfolder + endpointUrl: http://minio-service.#namespace:9000/ + verifyTLS: false diff --git a/e2e-tests/demand-backup-parallel/run b/e2e-tests/demand-backup-parallel/run new file mode 100755 index 000000000..328cae51e --- /dev/null +++ b/e2e-tests/demand-backup-parallel/run @@ -0,0 +1,64 @@ +#!/bin/bash + +# This test checks if spec.backup.allowParallel=false works as expected. + +set -o errexit + +test_dir=$(realpath $(dirname $0)) +. ${test_dir}/../functions + +set_debug + +function run_backup() { + local name=$1 + yq eval ".metadata.name = \"${name}\"" ${test_dir}/conf/backup.yml \ + | kubectl_bin apply -f - +} + +function check_active_backup_count() { + active_backup_count=$(kubectl_bin get pxc-backup | grep -E 'Starting|Running' | wc -l) + if [[ ${active_backup_count} -gt 1 ]]; then + log "There are ${active_backup_count} active backups. 'allowParallel: false' doesn't work properly" + exit 1 + fi +} + +create_infra ${namespace} + +start_minio + +log "creating PXC client" +kubectl_bin apply -f ${conf_dir}/client.yml + +log "creating cluster secrets" +kubectl_bin apply -f ${conf_dir}/secrets.yml + +cluster="demand-backup-parallel" +log "create PXC cluster: ${cluster}" +apply_config ${test_dir}/conf/cr.yml + +desc 'creating backups' +run_backup backup1 +run_backup backup2 +run_backup backup3 +run_backup backup4 + +wait_cluster_consistency ${cluster} 3 2 +sleep 5 +check_active_backup_count + +for i in $(seq 0 3); do + sleep 5 + check_active_backup_count + holder=$(kubectl_bin get lease pxc-${cluster}-backup-lock -o jsonpath={.spec.holderIdentity}) + log "Backup lock holder: ${holder}" + wait_backup ${holder} +done + +# explicitly check all backups to ensure all succeeded +wait_backup backup1 +wait_backup backup2 +wait_backup backup3 +wait_backup backup4 + +log "test passed" diff --git a/e2e-tests/functions b/e2e-tests/functions index ff4fe01ef..51c7ee660 100755 --- a/e2e-tests/functions +++ b/e2e-tests/functions @@ -60,6 +60,11 @@ set_debug() { fi } +log() { + echo "[$(date +%Y-%m-%dT%H:%M:%S%z)]" $* +} + + HELM_VERSION=$(helm version -c | $sed -re 's/.*SemVer:"([^"]+)".*/\1/; s/.*\bVersion:"([^"]+)".*/\1/') if [ "${HELM_VERSION:0:2}" == "v2" ]; then HELM_ARGS="--name" @@ -98,10 +103,11 @@ wait_cluster_consistency() { local i=0 local max=36 sleep 7 # wait for two reconcile loops ;) 3 sec x 2 times + 1 sec = 7 seconds + echo -n "waiting for pxc/${cluster_name} to be ready" until [[ "$(kubectl_bin get pxc "${cluster_name}" -o jsonpath='{.status.state}')" == "ready" && "$(kubectl_bin get pxc "${cluster_name}" -o jsonpath='{.status.pxc.ready}')" == "${cluster_size}" && "$(kubectl_bin get pxc "${cluster_name}" -o jsonpath='{.status.'$(get_proxy_engine ${cluster_name})'.ready}')" == "${proxy_size}" ]]; do - echo 'waiting for cluster readyness' + echo -n . sleep 20 if [[ $i -ge $max ]]; then echo "Something went wrong waiting for cluster consistency!" @@ -109,6 +115,7 @@ wait_cluster_consistency() { fi let i+=1 done + echo } create_namespace() { @@ -235,7 +242,7 @@ wait_backup() { set +o xtrace retry=0 - echo -n $backup + echo -n "waiting for pxc-backup/${backup} to reach ${status} state" until kubectl_bin get pxc-backup/$backup -o jsonpath='{.status.state}' 2>/dev/null | grep $status; do sleep 1 echo -n . diff --git a/e2e-tests/run-pr.csv b/e2e-tests/run-pr.csv index 8a8bdfc34..84a7c595f 100644 --- a/e2e-tests/run-pr.csv +++ b/e2e-tests/run-pr.csv @@ -5,6 +5,7 @@ custom-users,8.0 demand-backup-cloud,8.0 demand-backup-encrypted-with-tls,8.0 demand-backup,8.0 +demand-backup-parallel,8.0 haproxy,5.7 haproxy,8.0 init-deploy,5.7 diff --git a/e2e-tests/run-release.csv b/e2e-tests/run-release.csv index 29456f42f..2f4e182e7 100644 --- a/e2e-tests/run-release.csv +++ b/e2e-tests/run-release.csv @@ -5,6 +5,7 @@ cross-site custom-users default-cr demand-backup +demand-backup-parallel demand-backup-cloud demand-backup-encrypted-with-tls haproxy diff --git a/pkg/apis/pxc/v1/pxc_backup_types.go b/pkg/apis/pxc/v1/pxc_backup_types.go index ce62c38c8..454dc1c0a 100644 --- a/pkg/apis/pxc/v1/pxc_backup_types.go +++ b/pkg/apis/pxc/v1/pxc_backup_types.go @@ -47,14 +47,16 @@ type PerconaXtraDBClusterBackup struct { } type PXCBackupSpec struct { - PXCCluster string `json:"pxcCluster"` - StorageName string `json:"storageName,omitempty"` - ContainerOptions *BackupContainerOptions `json:"containerOptions,omitempty"` - ActiveDeadlineSeconds *int64 `json:"activeDeadlineSeconds,omitempty"` + PXCCluster string `json:"pxcCluster"` + StorageName string `json:"storageName,omitempty"` + ContainerOptions *BackupContainerOptions `json:"containerOptions,omitempty"` + StartingDeadlineSeconds *int64 `json:"startingDeadlineSeconds,omitempty"` + ActiveDeadlineSeconds *int64 `json:"activeDeadlineSeconds,omitempty"` } type PXCBackupStatus struct { State PXCBackupState `json:"state,omitempty"` + Error string `json:"error,omitempty"` CompletedAt *metav1.Time `json:"completed,omitempty"` LastScheduled *metav1.Time `json:"lastscheduled,omitempty"` Destination PXCBackupDestination `json:"destination,omitempty"` @@ -162,6 +164,7 @@ type PXCBackupState string const ( BackupNew PXCBackupState = "" + BackupSuspended PXCBackupState = "Suspended" BackupStarting PXCBackupState = "Starting" BackupRunning PXCBackupState = "Running" BackupFailed PXCBackupState = "Failed" @@ -185,3 +188,8 @@ func (cr *PerconaXtraDBClusterBackup) OwnerRef(scheme *runtime.Scheme) (metav1.O Controller: &trueVar, }, nil } + +func (cr *PerconaXtraDBClusterBackup) SetFailedStatusWithError(err error) { + cr.Status.State = BackupFailed + cr.Status.Error = err.Error() +} diff --git a/pkg/apis/pxc/v1/pxc_types.go b/pkg/apis/pxc/v1/pxc_types.go index df91b7c84..086495d12 100644 --- a/pkg/apis/pxc/v1/pxc_types.go +++ b/pkg/apis/pxc/v1/pxc_types.go @@ -160,17 +160,18 @@ const ( ) type PXCScheduledBackup struct { - AllowParallel *bool `json:"allowParallel,omitempty"` - Image string `json:"image,omitempty"` - ImagePullSecrets []corev1.LocalObjectReference `json:"imagePullSecrets,omitempty"` - ImagePullPolicy corev1.PullPolicy `json:"imagePullPolicy,omitempty"` - Schedule []PXCScheduledBackupSchedule `json:"schedule,omitempty"` - Storages map[string]*BackupStorageSpec `json:"storages,omitempty"` - ServiceAccountName string `json:"serviceAccountName,omitempty"` - Annotations map[string]string `json:"annotations,omitempty"` - PITR PITRSpec `json:"pitr,omitempty"` - BackoffLimit *int32 `json:"backoffLimit,omitempty"` - ActiveDeadlineSeconds *int64 `json:"activeDeadlineSeconds,omitempty"` + AllowParallel *bool `json:"allowParallel,omitempty"` + Image string `json:"image,omitempty"` + ImagePullSecrets []corev1.LocalObjectReference `json:"imagePullSecrets,omitempty"` + ImagePullPolicy corev1.PullPolicy `json:"imagePullPolicy,omitempty"` + Schedule []PXCScheduledBackupSchedule `json:"schedule,omitempty"` + Storages map[string]*BackupStorageSpec `json:"storages,omitempty"` + ServiceAccountName string `json:"serviceAccountName,omitempty"` + Annotations map[string]string `json:"annotations,omitempty"` + PITR PITRSpec `json:"pitr,omitempty"` + BackoffLimit *int32 `json:"backoffLimit,omitempty"` + ActiveDeadlineSeconds *int64 `json:"activeDeadlineSeconds,omitempty"` + StartingDeadlineSeconds *int64 `json:"startingDeadlineSeconds,omitempty"` } func (b *PXCScheduledBackup) GetAllowParallel() bool { diff --git a/pkg/apis/pxc/v1/zz_generated.deepcopy.go b/pkg/apis/pxc/v1/zz_generated.deepcopy.go index cbd0004d2..e5d2c3bad 100644 --- a/pkg/apis/pxc/v1/zz_generated.deepcopy.go +++ b/pkg/apis/pxc/v1/zz_generated.deepcopy.go @@ -401,6 +401,11 @@ func (in *PXCBackupSpec) DeepCopyInto(out *PXCBackupSpec) { *out = new(BackupContainerOptions) (*in).DeepCopyInto(*out) } + if in.StartingDeadlineSeconds != nil { + in, out := &in.StartingDeadlineSeconds, &out.StartingDeadlineSeconds + *out = new(int64) + **out = **in + } if in.ActiveDeadlineSeconds != nil { in, out := &in.ActiveDeadlineSeconds, &out.ActiveDeadlineSeconds *out = new(int64) @@ -519,6 +524,11 @@ func (in *PXCScheduledBackup) DeepCopyInto(out *PXCScheduledBackup) { *out = new(int64) **out = **in } + if in.StartingDeadlineSeconds != nil { + in, out := &in.StartingDeadlineSeconds, &out.StartingDeadlineSeconds + *out = new(int64) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PXCScheduledBackup. diff --git a/pkg/controller/pxc/backup.go b/pkg/controller/pxc/backup.go index 80a0d8442..1ae89632e 100644 --- a/pkg/controller/pxc/backup.go +++ b/pkg/controller/pxc/backup.go @@ -198,8 +198,9 @@ func (r *ReconcilePerconaXtraDBCluster) createBackupJob(ctx context.Context, cr Labels: naming.LabelsScheduledBackup(cr, backupJob.Name), }, Spec: api.PXCBackupSpec{ - PXCCluster: cr.Name, - StorageName: backupJob.StorageName, + PXCCluster: cr.Name, + StorageName: backupJob.StorageName, + StartingDeadlineSeconds: cr.Spec.Backup.StartingDeadlineSeconds, }, } err = r.client.Create(context.TODO(), bcp) diff --git a/pkg/controller/pxc/controller.go b/pkg/controller/pxc/controller.go index 7cb18926f..b5c7b8574 100644 --- a/pkg/controller/pxc/controller.go +++ b/pkg/controller/pxc/controller.go @@ -17,7 +17,6 @@ import ( corev1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" @@ -26,7 +25,6 @@ import ( k8sretry "k8s.io/client-go/util/retry" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "sigs.k8s.io/controller-runtime/pkg/handler" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -34,6 +32,7 @@ import ( "github.com/percona/percona-xtradb-cluster-operator/clientcmd" api "github.com/percona/percona-xtradb-cluster-operator/pkg/apis/pxc/v1" + "github.com/percona/percona-xtradb-cluster-operator/pkg/k8s" "github.com/percona/percona-xtradb-cluster-operator/pkg/naming" "github.com/percona/percona-xtradb-cluster-operator/pkg/pxc" "github.com/percona/percona-xtradb-cluster-operator/pkg/pxc/app" @@ -575,7 +574,7 @@ func (r *ReconcilePerconaXtraDBCluster) reconcileConfigMap(cr *api.PerconaXtraDB return errors.Wrap(err, "new autotune configmap") } - err = setControllerReference(cr, configMap, r.scheme) + err = k8s.SetControllerReference(cr, configMap, r.scheme) if err != nil { return errors.Wrap(err, "set autotune configmap controller ref") } @@ -593,7 +592,7 @@ func (r *ReconcilePerconaXtraDBCluster) reconcileConfigMap(cr *api.PerconaXtraDB pxcConfigName := config.CustomConfigMapName(cr.Name, "pxc") if cr.Spec.PXC.Configuration != "" { configMap := config.NewConfigMap(cr, pxcConfigName, "init.cnf", cr.Spec.PXC.Configuration) - err := setControllerReference(cr, configMap, r.scheme) + err := k8s.SetControllerReference(cr, configMap, r.scheme) if err != nil { return errors.Wrap(err, "set controller ref") } @@ -660,7 +659,7 @@ func (r *ReconcilePerconaXtraDBCluster) reconcileConfigMap(cr *api.PerconaXtraDB if cr.Spec.ProxySQLEnabled() { if cr.Spec.ProxySQL.Configuration != "" { configMap := config.NewConfigMap(cr, proxysqlConfigName, "proxysql.cnf", cr.Spec.ProxySQL.Configuration) - err := setControllerReference(cr, configMap, r.scheme) + err := k8s.SetControllerReference(cr, configMap, r.scheme) if err != nil { return errors.Wrap(err, "set controller ref ProxySQL") } @@ -679,7 +678,7 @@ func (r *ReconcilePerconaXtraDBCluster) reconcileConfigMap(cr *api.PerconaXtraDB haproxyConfigName := config.CustomConfigMapName(cr.Name, "haproxy") if cr.HAProxyEnabled() && cr.Spec.HAProxy.Configuration != "" { configMap := config.NewConfigMap(cr, haproxyConfigName, "haproxy-global.cfg", cr.Spec.HAProxy.Configuration) - err := setControllerReference(cr, configMap, r.scheme) + err := k8s.SetControllerReference(cr, configMap, r.scheme) if err != nil { return errors.Wrap(err, "set controller ref HAProxy") } @@ -697,7 +696,7 @@ func (r *ReconcilePerconaXtraDBCluster) reconcileConfigMap(cr *api.PerconaXtraDB logCollectorConfigName := config.CustomConfigMapName(cr.Name, "logcollector") if cr.Spec.LogCollector != nil && cr.Spec.LogCollector.Configuration != "" { configMap := config.NewConfigMap(cr, logCollectorConfigName, "fluentbit_custom.conf", cr.Spec.LogCollector.Configuration) - err := setControllerReference(cr, configMap, r.scheme) + err := k8s.SetControllerReference(cr, configMap, r.scheme) if err != nil { return errors.Wrap(err, "set controller ref LogCollector") } @@ -716,7 +715,7 @@ func (r *ReconcilePerconaXtraDBCluster) reconcileConfigMap(cr *api.PerconaXtraDB func (r *ReconcilePerconaXtraDBCluster) createHookScriptConfigMap(cr *api.PerconaXtraDBCluster, hookScript string, configMapName string) error { configMap := config.NewConfigMap(cr, configMapName, "hook.sh", hookScript) - err := setControllerReference(cr, configMap, r.scheme) + err := k8s.SetControllerReference(cr, configMap, r.scheme) if err != nil { return errors.Wrap(err, "set controller ref") } @@ -742,7 +741,7 @@ func (r *ReconcilePerconaXtraDBCluster) reconcilePDB(ctx context.Context, cr *ap } pdb := pxc.PodDisruptionBudget(cr, spec, sfs.Labels()) - if err := setControllerReference(sts, pdb, r.scheme); err != nil { + if err := k8s.SetControllerReference(sts, pdb, r.scheme); err != nil { return errors.Wrap(err, "set owner reference") } @@ -984,38 +983,6 @@ func (r *ReconcilePerconaXtraDBCluster) deleteCerts(ctx context.Context, cr *api return nil } -func setControllerReference(ro runtime.Object, obj metav1.Object, scheme *runtime.Scheme) error { - ownerRef, err := OwnerRef(ro, scheme) - if err != nil { - return err - } - obj.SetOwnerReferences(append(obj.GetOwnerReferences(), ownerRef)) - return nil -} - -// OwnerRef returns OwnerReference to object -func OwnerRef(ro runtime.Object, scheme *runtime.Scheme) (metav1.OwnerReference, error) { - gvk, err := apiutil.GVKForObject(ro, scheme) - if err != nil { - return metav1.OwnerReference{}, err - } - - trueVar := true - - ca, err := meta.Accessor(ro) - if err != nil { - return metav1.OwnerReference{}, err - } - - return metav1.OwnerReference{ - APIVersion: gvk.GroupVersion().String(), - Kind: gvk.Kind, - Name: ca.GetName(), - UID: ca.GetUID(), - Controller: &trueVar, - }, nil -} - // resyncPXCUsersWithProxySQL calls the method of synchronizing users and makes sure that only one Goroutine works at a time func (r *ReconcilePerconaXtraDBCluster) resyncPXCUsersWithProxySQL(ctx context.Context, cr *api.PerconaXtraDBCluster) { if !cr.Spec.ProxySQLEnabled() { @@ -1182,7 +1149,7 @@ func mergeMaps(x, y map[string]string) map[string]string { } func (r *ReconcilePerconaXtraDBCluster) createOrUpdateService(ctx context.Context, cr *api.PerconaXtraDBCluster, svc *corev1.Service, saveOldMeta bool) error { - err := setControllerReference(cr, svc, r.scheme) + err := k8s.SetControllerReference(cr, svc, r.scheme) if err != nil { return errors.Wrap(err, "set controller reference") } diff --git a/pkg/controller/pxc/pitr.go b/pkg/controller/pxc/pitr.go index dfe96196e..5147aeddd 100644 --- a/pkg/controller/pxc/pitr.go +++ b/pkg/controller/pxc/pitr.go @@ -25,7 +25,7 @@ func (r *ReconcilePerconaXtraDBCluster) reconcileBinlogCollector(ctx context.Con return errors.Wrapf(err, "get binlog collector deployment for cluster '%s'", cr.Name) } - err = setControllerReference(cr, &collector, r.scheme) + err = k8s.SetControllerReference(cr, &collector, r.scheme) if err != nil { return errors.Wrapf(err, "set controller reference for binlog collector deployment '%s'", collector.Name) } diff --git a/pkg/controller/pxc/tls.go b/pkg/controller/pxc/tls.go index 3a133d2ab..16c753597 100644 --- a/pkg/controller/pxc/tls.go +++ b/pkg/controller/pxc/tls.go @@ -278,7 +278,7 @@ func (r *ReconcilePerconaXtraDBCluster) createSSLManualy(cr *api.PerconaXtraDBCl data["ca.crt"] = caCert data["tls.crt"] = tlsCert data["tls.key"] = key - owner, err := OwnerRef(cr, r.scheme) + owner, err := k8s.OwnerRef(cr, r.scheme) if err != nil { return err } diff --git a/pkg/controller/pxc/upgrade.go b/pkg/controller/pxc/upgrade.go index 0c232e2a4..f7f34b841 100644 --- a/pkg/controller/pxc/upgrade.go +++ b/pkg/controller/pxc/upgrade.go @@ -130,7 +130,7 @@ func (r *ReconcilePerconaXtraDBCluster) updatePod(ctx context.Context, sfs api.S sts.Spec.Template.Annotations = annotations sts.Spec.Template.Labels = labels - if err := setControllerReference(cr, sts, r.scheme); err != nil { + if err := k8s.SetControllerReference(cr, sts, r.scheme); err != nil { return errors.Wrap(err, "set controller reference") } err = r.createOrUpdate(ctx, cr, sts) @@ -168,12 +168,10 @@ func (r *ReconcilePerconaXtraDBCluster) smartUpdate(ctx context.Context, sfs api } if cr.HAProxyEnabled() && cr.Status.HAProxy.Status != api.AppStateReady { - log.V(1).Info("Waiting for HAProxy to be ready before smart update") return nil } if cr.ProxySQLEnabled() && cr.Status.ProxySQL.Status != api.AppStateReady { - log.V(1).Info("Waiting for ProxySQL to be ready before smart update") return nil } diff --git a/pkg/controller/pxcbackup/controller.go b/pkg/controller/pxcbackup/controller.go index b321473ce..5485da244 100644 --- a/pkg/controller/pxcbackup/controller.go +++ b/pkg/controller/pxcbackup/controller.go @@ -14,10 +14,10 @@ import ( corev1 "k8s.io/api/core/v1" k8sErrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/retry" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -117,7 +117,7 @@ func (r *ReconcilePerconaXtraDBClusterBackup) Reconcile(ctx context.Context, req // Fetch the PerconaXtraDBClusterBackup instance cr := &api.PerconaXtraDBClusterBackup{} - err := r.client.Get(context.TODO(), request.NamespacedName, cr) + err := r.client.Get(ctx, request.NamespacedName, cr) if err != nil { if k8sErrors.IsNotFound(err) { // Request object not found, could have been deleted after reconcile request. @@ -129,16 +129,21 @@ func (r *ReconcilePerconaXtraDBClusterBackup) Reconcile(ctx context.Context, req return reconcile.Result{}, err } + err = r.ensureFinalizers(ctx, cr) + if err != nil { + return reconcile.Result{}, errors.Wrap(err, "ensure finalizers") + } + err = r.tryRunBackupFinalizers(ctx, cr) if err != nil { - return reconcile.Result{}, errors.Wrap(err, "failed to run finalizers") + return reconcile.Result{}, errors.Wrap(err, "run finalizers") } - if cr.Status.State == api.BackupSucceeded || - cr.Status.State == api.BackupFailed { + if cr.Status.State == api.BackupSucceeded || cr.Status.State == api.BackupFailed { if len(cr.GetFinalizers()) > 0 { return rr, nil } + return reconcile.Result{}, nil } @@ -146,40 +151,79 @@ func (r *ReconcilePerconaXtraDBClusterBackup) Reconcile(ctx context.Context, req return rr, nil } - cluster, err := r.getCluster(cr) + if err := r.checkStartingDeadline(ctx, cr); err != nil { + if err := r.setFailedStatus(ctx, cr, err); err != nil { + return rr, errors.Wrap(err, "update status") + } + + return reconcile.Result{}, nil + } + + cluster, err := r.getCluster(ctx, cr) if err != nil { - log.Error(err, "invalid backup cluster") - return rr, nil + return reconcile.Result{}, errors.Wrap(err, "get cluster") } + log = log.WithValues("cluster", cluster.Name) + err = cluster.CheckNSetDefaults(r.serverVersion, log) if err != nil { - return rr, errors.Wrap(err, "wrong PXC options") + err := errors.Wrap(err, "wrong PXC options") + + if err := r.setFailedStatus(ctx, cr, err); err != nil { + return rr, errors.Wrap(err, "update status") + } + + return reconcile.Result{}, err } if cluster.Spec.Backup == nil { - return rr, errors.New("a backup image should be set in the PXC config") + err := errors.New("a backup image should be set in the PXC config") + + if err := r.setFailedStatus(ctx, cr, err); err != nil { + return rr, errors.Wrap(err, "update status") + } + + return reconcile.Result{}, err + } + + if err := r.reconcileBackupJob(ctx, cr, cluster); err != nil { + return rr, errors.Wrap(err, "reconcile backup job") } if err := cluster.CanBackup(); err != nil { - return rr, errors.Wrap(err, "failed to run backup") + log.Info("Cluster is not ready for backup", "reason", err.Error()) + + return rr, nil + } + + storage, ok := cluster.Spec.Backup.Storages[cr.Spec.StorageName] + if !ok { + err := errors.Errorf("storage %s doesn't exist", cr.Spec.StorageName) + + if err := r.setFailedStatus(ctx, cr, err); err != nil { + return rr, errors.Wrap(err, "update status") + } + + return reconcile.Result{}, err } + log = log.WithValues("storage", cr.Spec.StorageName) + + log.V(1).Info("Check if parallel backups are allowed", "allowed", cluster.Spec.Backup.GetAllowParallel()) if !cluster.Spec.Backup.GetAllowParallel() { - isRunning, err := r.isOtherBackupRunning(ctx, cr, cluster) + lease, err := k8s.AcquireLease(ctx, r.client, naming.BackupLeaseName(cluster.Name), cr.Namespace, cr.Name) if err != nil { - return rr, errors.Wrap(err, "failed to check if other backups running") + return reconcile.Result{}, errors.Wrap(err, "acquire backup lock") } - if isRunning { - log.Info("backup already running, waiting until it's done") + + if lease.Spec.HolderIdentity != nil && *lease.Spec.HolderIdentity != cr.Name { + log.Info("Another backup is holding the lock", "holder", *lease.Spec.HolderIdentity) + return rr, nil } } - storage, ok := cluster.Spec.Backup.Storages[cr.Spec.StorageName] - if !ok { - return rr, errors.Errorf("storage %s doesn't exist", cr.Spec.StorageName) - } if cr.Status.S3 == nil || cr.Status.Azure == nil { cr.Status.S3 = storage.S3 cr.Status.Azure = storage.Azure @@ -191,15 +235,50 @@ func (r *ReconcilePerconaXtraDBClusterBackup) Reconcile(ctx context.Context, req cr.Status.VerifyTLS = storage.VerifyTLS } + job, err := r.createBackupJob(ctx, cr, cluster, storage) + if err != nil { + err = errors.Wrap(err, "create backup job") + + if err := r.setFailedStatus(ctx, cr, err); err != nil { + return rr, errors.Wrap(err, "update status") + } + + return reconcile.Result{}, err + } + + err = r.updateJobStatus(ctx, cr, job, cr.Spec.StorageName, storage, cluster) + + switch cr.Status.State { + case api.BackupSucceeded, api.BackupFailed: + log.Info("Releasing backup lock", "lease", naming.BackupLeaseName(cluster.Name)) + + if err := k8s.ReleaseLease(ctx, r.client, naming.BackupLeaseName(cluster.Name), cr.Namespace); err != nil { + return reconcile.Result{}, errors.Wrap(err, "release backup lock") + } + + return reconcile.Result{}, nil + } + + return rr, err +} + +func (r *ReconcilePerconaXtraDBClusterBackup) createBackupJob( + ctx context.Context, + cr *api.PerconaXtraDBClusterBackup, + cluster *api.PerconaXtraDBCluster, + storage *api.BackupStorageSpec, +) (*batchv1.Job, error) { + log := logf.FromContext(ctx) + bcp := backup.New(cluster) job := bcp.Job(cr, cluster) initImage, err := k8s.GetInitImage(ctx, cluster, r.client) if err != nil { - return rr, errors.Wrap(err, "failed to get initImage") + return nil, errors.Wrap(err, "failed to get initImage") } job.Spec, err = bcp.JobSpec(cr.Spec, cluster, job, initImage) if err != nil { - return rr, errors.Wrap(err, "can't create job spec") + return nil, errors.Wrap(err, "can't create job spec") } switch storage.Type { @@ -210,63 +289,77 @@ func (r *ReconcilePerconaXtraDBClusterBackup) Reconcile(ctx context.Context, req cr.Status.Destination.SetPVCDestination(pvc.Name) // Set PerconaXtraDBClusterBackup instance as the owner and controller - if err := setControllerReference(cr, pvc, r.scheme); err != nil { - return rr, errors.Wrap(err, "setControllerReference") + if err := k8s.SetControllerReference(cr, pvc, r.scheme); err != nil { + return nil, errors.Wrap(err, "setControllerReference") } // Check if this PVC already exists - err = r.client.Get(context.TODO(), types.NamespacedName{Name: pvc.Name, Namespace: pvc.Namespace}, pvc) + err = r.client.Get(ctx, types.NamespacedName{Name: pvc.Name, Namespace: pvc.Namespace}, pvc) if err != nil && k8sErrors.IsNotFound(err) { log.Info("Creating a new volume for backup", "Namespace", pvc.Namespace, "Name", pvc.Name) - err = r.client.Create(context.TODO(), pvc) + err = r.client.Create(ctx, pvc) if err != nil { - return rr, errors.Wrap(err, "create backup pvc") + return nil, errors.Wrap(err, "create backup pvc") } } else if err != nil { - return rr, errors.Wrap(err, "get backup pvc") + return nil, errors.Wrap(err, "get backup pvc") } err := backup.SetStoragePVC(&job.Spec, cr, pvc.Name) if err != nil { - return rr, errors.Wrap(err, "set storage FS") + return nil, errors.Wrap(err, "set storage FS") } case api.BackupStorageS3: if storage.S3 == nil { - return rr, errors.New("s3 storage is not specified") + return nil, errors.New("s3 storage is not specified") } cr.Status.Destination.SetS3Destination(storage.S3.Bucket, cr.Spec.PXCCluster+"-"+cr.CreationTimestamp.Time.Format("2006-01-02-15:04:05")+"-full") err := backup.SetStorageS3(&job.Spec, cr) if err != nil { - return rr, errors.Wrap(err, "set storage FS") + return nil, errors.Wrap(err, "set storage FS") } case api.BackupStorageAzure: if storage.Azure == nil { - return rr, errors.New("azure storage is not specified") + return nil, errors.New("azure storage is not specified") } cr.Status.Destination.SetAzureDestination(storage.Azure.ContainerPath, cr.Spec.PXCCluster+"-"+cr.CreationTimestamp.Time.Format("2006-01-02-15:04:05")+"-full") err := backup.SetStorageAzure(&job.Spec, cr) if err != nil { - return rr, errors.Wrap(err, "set storage FS for Azure") + return nil, errors.Wrap(err, "set storage FS for Azure") } } // Set PerconaXtraDBClusterBackup instance as the owner and controller - if err := setControllerReference(cr, job, r.scheme); err != nil { - return rr, errors.Wrap(err, "job/setControllerReference") + if err := k8s.SetControllerReference(cr, job, r.scheme); err != nil { + return nil, errors.Wrap(err, "job/setControllerReference") } - err = r.client.Create(context.TODO(), job) + err = r.client.Create(ctx, job) if err != nil && !k8sErrors.IsAlreadyExists(err) { - return rr, errors.Wrap(err, "create backup job") + return nil, errors.Wrap(err, "create backup job") } else if err == nil { - log.Info("Created a new backup job", "Namespace", job.Namespace, "Name", job.Name) + log.Info("Created a new backup job", "namespace", job.Namespace, "name", job.Name) } - err = r.updateJobStatus(cr, job, cr.Spec.StorageName, storage, cluster) + return job, nil +} - return rr, err +func (r *ReconcilePerconaXtraDBClusterBackup) ensureFinalizers(ctx context.Context, cr *api.PerconaXtraDBClusterBackup) error { + for _, f := range cr.GetFinalizers() { + if f == naming.FinalizerReleaseLock { + return nil + } + } + + orig := cr.DeepCopy() + cr.SetFinalizers(append(cr.GetFinalizers(), naming.FinalizerReleaseLock)) + if err := r.client.Patch(ctx, cr.DeepCopy(), client.MergeFrom(orig)); err != nil { + return errors.Wrap(err, "patch finalizers") + } + + return nil } func (r *ReconcilePerconaXtraDBClusterBackup) tryRunBackupFinalizers(ctx context.Context, cr *api.PerconaXtraDBClusterBackup) error { @@ -282,7 +375,7 @@ func (r *ReconcilePerconaXtraDBClusterBackup) tryRunBackupFinalizers(ctx context return nil } - go r.runDeleteBackupFinalizer(ctx, cr) + go r.runBackupFinalizers(ctx, cr) default: if _, ok := r.bcpDeleteInProgress.Load(cr.Name); !ok { inprog := []string{} @@ -299,7 +392,7 @@ func (r *ReconcilePerconaXtraDBClusterBackup) tryRunBackupFinalizers(ctx context return nil } -func (r *ReconcilePerconaXtraDBClusterBackup) runDeleteBackupFinalizer(ctx context.Context, cr *api.PerconaXtraDBClusterBackup) { +func (r *ReconcilePerconaXtraDBClusterBackup) runBackupFinalizers(ctx context.Context, cr *api.PerconaXtraDBClusterBackup) { log := logf.FromContext(ctx) defer func() { @@ -315,10 +408,10 @@ func (r *ReconcilePerconaXtraDBClusterBackup) runDeleteBackupFinalizer(ctx conte log.Info("The finalizer delete-s3-backup is deprecated and will be deleted in 1.18.0. Use percona.com/delete-backup") fallthrough case naming.FinalizerDeleteBackup: - if (cr.Status.S3 == nil && cr.Status.Azure == nil) || cr.Status.Destination == "" { continue } + switch cr.Status.GetStorageType(nil) { case api.BackupStorageS3: if cr.Status.Destination.StorageTypePrefix() != api.AwsBlobStoragePrefix { @@ -330,15 +423,24 @@ func (r *ReconcilePerconaXtraDBClusterBackup) runDeleteBackupFinalizer(ctx conte default: continue } + + if err != nil { + log.Info("failed to delete backup", "backup path", cr.Status.Destination, "error", err.Error()) + finalizers = append(finalizers, f) + continue + } + + log.Info("backup was removed", "name", cr.Name) + case naming.FinalizerReleaseLock: + err = r.runReleaseLockFinalizer(ctx, cr) + if err != nil { + log.Error(err, "failed to release backup lock") + finalizers = append(finalizers, f) + } default: finalizers = append(finalizers, f) } - if err != nil { - log.Info("failed to delete backup", "backup path", cr.Status.Destination, "error", err.Error()) - finalizers = append(finalizers, f) - } else if f == naming.FinalizerDeleteBackup || f == naming.FinalizerS3DeleteBackup { - log.Info("backup was removed", "name", cr.Name) - } + } cr.SetFinalizers(finalizers) @@ -409,6 +511,14 @@ func (r *ReconcilePerconaXtraDBClusterBackup) runAzureBackupFinalizer(ctx contex return nil } +func (r *ReconcilePerconaXtraDBClusterBackup) runReleaseLockFinalizer(ctx context.Context, cr *api.PerconaXtraDBClusterBackup) error { + err := k8s.ReleaseLease(ctx, r.client, naming.BackupLeaseName(cr.Spec.PXCCluster), cr.Namespace) + if k8sErrors.IsNotFound(err) { + return nil + } + return errors.Wrap(err, "release backup lock") +} + func removeBackupObjects(ctx context.Context, s storage.Storage, destination string) func() error { return func() error { blobs, err := s.ListObjects(ctx, destination) @@ -440,9 +550,9 @@ func removeBackupObjects(ctx context.Context, s storage.Storage, destination str } } -func (r *ReconcilePerconaXtraDBClusterBackup) getCluster(cr *api.PerconaXtraDBClusterBackup) (*api.PerconaXtraDBCluster, error) { +func (r *ReconcilePerconaXtraDBClusterBackup) getCluster(ctx context.Context, cr *api.PerconaXtraDBClusterBackup) (*api.PerconaXtraDBCluster, error) { cluster := api.PerconaXtraDBCluster{} - err := r.client.Get(context.TODO(), types.NamespacedName{Namespace: cr.Namespace, Name: cr.Spec.PXCCluster}, &cluster) + err := r.client.Get(ctx, types.NamespacedName{Namespace: cr.Namespace, Name: cr.Spec.PXCCluster}, &cluster) if err != nil { return nil, errors.Wrap(err, "get PXC cluster") } @@ -450,10 +560,17 @@ func (r *ReconcilePerconaXtraDBClusterBackup) getCluster(cr *api.PerconaXtraDBCl return &cluster, nil } -func (r *ReconcilePerconaXtraDBClusterBackup) updateJobStatus(bcp *api.PerconaXtraDBClusterBackup, job *batchv1.Job, - storageName string, storage *api.BackupStorageSpec, cluster *api.PerconaXtraDBCluster, +func (r *ReconcilePerconaXtraDBClusterBackup) updateJobStatus( + ctx context.Context, + bcp *api.PerconaXtraDBClusterBackup, + job *batchv1.Job, + storageName string, + storage *api.BackupStorageSpec, + cluster *api.PerconaXtraDBCluster, ) error { - err := r.client.Get(context.TODO(), types.NamespacedName{Name: job.Name, Namespace: job.Namespace}, job) + log := logf.FromContext(ctx).WithValues("job", job.Name) + + err := r.client.Get(ctx, types.NamespacedName{Name: job.Name, Namespace: job.Namespace}, job) if err != nil { if k8sErrors.IsNotFound(err) { return nil @@ -500,20 +617,25 @@ func (r *ReconcilePerconaXtraDBClusterBackup) updateJobStatus(bcp *api.PerconaXt bcp.Status = status - if status.State == api.BackupSucceeded { + switch status.State { + case api.BackupSucceeded: + log.Info("Backup succeeded") + if cluster.PITREnabled() { - collectorPod, err := binlogcollector.GetPod(context.TODO(), r.client, cluster) + collectorPod, err := binlogcollector.GetPod(ctx, r.client, cluster) if err != nil { return errors.Wrap(err, "get binlog collector pod") } - if err := binlogcollector.RemoveGapFile(context.TODO(), cluster, r.clientcmd, collectorPod); err != nil { + log.V(1).Info("Removing binlog gap file from binlog collector", "pod", collectorPod.Name) + if err := binlogcollector.RemoveGapFile(r.clientcmd, collectorPod); err != nil { if !errors.Is(err, binlogcollector.GapFileNotFound) { return errors.Wrap(err, "remove gap file") } } - if err := binlogcollector.RemoveTimelineFile(context.TODO(), cluster, r.clientcmd, collectorPod); err != nil { + log.V(1).Info("Removing binlog timeline file from binlog collector", "pod", collectorPod.Name) + if err := binlogcollector.RemoveTimelineFile(r.clientcmd, collectorPod); err != nil { return errors.Wrap(err, "remove timeline file") } } @@ -524,76 +646,195 @@ func (r *ReconcilePerconaXtraDBClusterBackup) updateJobStatus(bcp *api.PerconaXt Namespace: cluster.Namespace, }, } - if err := r.client.Delete(context.TODO(), &initSecret); client.IgnoreNotFound(err) != nil { + log.V(1).Info("Removing mysql-init secret", "secret", initSecret.Name) + if err := r.client.Delete(ctx, &initSecret); client.IgnoreNotFound(err) != nil { return errors.Wrap(err, "delete mysql-init secret") } + case api.BackupFailed: + log.Info("Backup failed") } - err = r.client.Status().Update(context.TODO(), bcp) - if err != nil { - return errors.Wrap(err, "send update") + if err := r.updateStatus(ctx, bcp); err != nil { + return errors.Wrap(err, "update status") } return nil } -func setControllerReference(cr *api.PerconaXtraDBClusterBackup, obj metav1.Object, scheme *runtime.Scheme) error { - ownerRef, err := cr.OwnerRef(scheme) - if err != nil { - return err +func (r *ReconcilePerconaXtraDBClusterBackup) checkStartingDeadline(ctx context.Context, cr *api.PerconaXtraDBClusterBackup) error { + log := logf.FromContext(ctx) + + since := time.Since(cr.CreationTimestamp.Time).Seconds() + + if cr.Spec.StartingDeadlineSeconds == nil { + return nil + } + + if since < float64(*cr.Spec.StartingDeadlineSeconds) { + return nil } - obj.SetOwnerReferences(append(obj.GetOwnerReferences(), ownerRef)) + + if cr.Status.State == api.BackupNew { + log.Info("Backup didn't start in startingDeadlineSeconds, failing the backup", + "startingDeadlineSeconds", *cr.Spec.StartingDeadlineSeconds, + "passedSeconds", since) + return errors.New("starting deadline seconds exceeded") + } + return nil } -func (r *ReconcilePerconaXtraDBClusterBackup) isOtherBackupRunning(ctx context.Context, cr *api.PerconaXtraDBClusterBackup, cluster *api.PerconaXtraDBCluster) (bool, error) { - list := new(batchv1.JobList) - if err := r.client.List(ctx, list, &client.ListOptions{ - Namespace: cluster.Namespace, - LabelSelector: labels.SelectorFromSet(naming.LabelsBackup(cluster)), - }); err != nil { - return false, errors.Wrap(err, "list jobs") +func (r *ReconcilePerconaXtraDBClusterBackup) updateStatus(ctx context.Context, cr *api.PerconaXtraDBClusterBackup) error { + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + localCr := new(api.PerconaXtraDBClusterBackup) + err := r.client.Get(ctx, client.ObjectKeyFromObject(cr), localCr) + if err != nil { + return err + } + + localCr.Status = cr.Status + + return r.client.Status().Update(ctx, localCr) + }) +} + +func (r *ReconcilePerconaXtraDBClusterBackup) setFailedStatus( + ctx context.Context, + cr *api.PerconaXtraDBClusterBackup, + err error, +) error { + cr.SetFailedStatusWithError(err) + return r.updateStatus(ctx, cr) +} + +func (r *ReconcilePerconaXtraDBClusterBackup) suspendJobIfNeeded( + ctx context.Context, + cr *api.PerconaXtraDBClusterBackup, + cluster *api.PerconaXtraDBCluster, +) error { + if cluster.Spec.Unsafe.BackupIfUnhealthy { + return nil + } + + if cluster.Status.Status == api.AppStateReady { + return nil + } + + if cluster.Status.PXC.Ready == cluster.Status.PXC.Size { + return nil } - for _, job := range list.Items { - backupNameLabelKey := naming.LabelPerconaBackupName - if cluster.CompareVersionWith("1.16.0") < 0 { - backupNameLabelKey = "backup-name" + log := logf.FromContext(ctx) + + labelKeyBackupType := naming.GetLabelBackupType(cluster) + jobName := naming.BackupJobName(cr.Name, cr.Labels[labelKeyBackupType] == "cron") + + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + job := new(batchv1.Job) + + err := r.client.Get(ctx, types.NamespacedName{Namespace: cr.Namespace, Name: jobName}, job) + if err != nil { + if k8sErrors.IsNotFound(err) { + return nil + } + return err } - if job.Labels[backupNameLabelKey] == cr.Name { - continue + + suspended := false + for _, cond := range job.Status.Conditions { + if cond.Type == batchv1.JobSuspended && cond.Status == corev1.ConditionTrue { + suspended = true + } } - if job.Status.Active == 0 && (jobSucceded(&job) || jobFailed(&job)) { - continue + + if suspended { + return nil } - return true, nil - } + log.Info("Suspending backup job", + "job", jobName, + "clusterStatus", cluster.Status.Status, + "readyPXC", cluster.Status.PXC.Ready) + + job.Spec.Suspend = ptr.To(true) + + err = r.client.Update(ctx, job) + if err != nil { + return err + } + + cr.Status.State = api.BackupSuspended + return r.updateStatus(ctx, cr) + }) - return false, nil + return err } -func jobFailed(job *batchv1.Job) bool { - failedCondition := findJobCondition(job.Status.Conditions, batchv1.JobFailed) - if failedCondition != nil && failedCondition.Status == corev1.ConditionTrue { - return true +func (r *ReconcilePerconaXtraDBClusterBackup) resumeJobIfNeeded( + ctx context.Context, + cr *api.PerconaXtraDBClusterBackup, + cluster *api.PerconaXtraDBCluster, +) error { + if cluster.Status.Status != api.AppStateReady { + return nil } - return false -} -func jobSucceded(job *batchv1.Job) bool { - succeededCondition := findJobCondition(job.Status.Conditions, batchv1.JobComplete) - if succeededCondition != nil && succeededCondition.Status == corev1.ConditionTrue { - return true + if cluster.Status.PXC.Ready != cluster.Status.PXC.Size { + return nil } - return false -} -func findJobCondition(conditions []batchv1.JobCondition, condType batchv1.JobConditionType) *batchv1.JobCondition { - for i, cond := range conditions { - if cond.Type == condType { - return &conditions[i] + log := logf.FromContext(ctx) + + labelKeyBackupType := naming.GetLabelBackupType(cluster) + jobName := naming.BackupJobName(cr.Name, cr.Labels[labelKeyBackupType] == "cron") + + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + job := new(batchv1.Job) + + err := r.client.Get(ctx, types.NamespacedName{Namespace: cr.Namespace, Name: jobName}, job) + if err != nil { + if k8sErrors.IsNotFound(err) { + return nil + } + return err + } + + suspended := false + for _, cond := range job.Status.Conditions { + if cond.Type == batchv1.JobSuspended && cond.Status == corev1.ConditionTrue { + suspended = true + } + } + + if !suspended { + return nil } + + log.Info("Resuming backup job", + "job", jobName, + "clusterStatus", cluster.Status.Status, + "readyPXC", cluster.Status.PXC.Ready) + + job.Spec.Suspend = ptr.To(false) + + return r.client.Update(ctx, job) + }) + + return err +} + +func (r *ReconcilePerconaXtraDBClusterBackup) reconcileBackupJob( + ctx context.Context, + cr *api.PerconaXtraDBClusterBackup, + cluster *api.PerconaXtraDBCluster, +) error { + if err := r.suspendJobIfNeeded(ctx, cr, cluster); err != nil { + return errors.Wrap(err, "suspend job if needed") } + + if err := r.resumeJobIfNeeded(ctx, cr, cluster); err != nil { + return errors.Wrap(err, "suspend job if needed") + } + return nil } diff --git a/pkg/k8s/k8s_suite_test.go b/pkg/k8s/k8s_suite_test.go new file mode 100644 index 000000000..3985e3b74 --- /dev/null +++ b/pkg/k8s/k8s_suite_test.go @@ -0,0 +1,13 @@ +package k8s_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestK8s(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "K8s Suite") +} diff --git a/pkg/k8s/lease.go b/pkg/k8s/lease.go new file mode 100644 index 000000000..8eb7d6958 --- /dev/null +++ b/pkg/k8s/lease.go @@ -0,0 +1,56 @@ +package k8s + +import ( + "context" + "time" + + "github.com/pkg/errors" + coordv1 "k8s.io/api/coordination/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func AcquireLease(ctx context.Context, c client.Client, name, namespace, holder string) (*coordv1.Lease, error) { + lease := new(coordv1.Lease) + + if err := c.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, lease); err != nil { + if !k8serrors.IsNotFound(err) { + return lease, errors.Wrap(err, "get lease") + } + + lease := &coordv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: coordv1.LeaseSpec{ + AcquireTime: &metav1.MicroTime{Time: time.Now()}, + HolderIdentity: &holder, + }, + } + + if err := c.Create(ctx, lease); err != nil { + return lease, errors.Wrap(err, "create lease") + } + + return lease, nil + } + + return lease, nil +} + +func ReleaseLease(ctx context.Context, c client.Client, name, namespace string) error { + lease := new(coordv1.Lease) + + if err := c.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, lease); err != nil { + return errors.Wrap(err, "get lease") + } + + if err := c.Delete(ctx, lease); err != nil { + return errors.Wrap(err, "delete lease") + } + + return nil +} diff --git a/pkg/k8s/lease_test.go b/pkg/k8s/lease_test.go new file mode 100644 index 000000000..873c1c756 --- /dev/null +++ b/pkg/k8s/lease_test.go @@ -0,0 +1,73 @@ +package k8s_test + +import ( + "context" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/percona/percona-xtradb-cluster-operator/pkg/k8s" + coordv1 "k8s.io/api/coordination/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client/fake" // nolint +) + +var _ = Describe("Lease", func() { + It("should be create a lease", func() { + cl := fake.NewFakeClient() + + ctx := context.Background() + + name := "backup-lock" + namespace := "test" + holder := "backup1" + + lease, err := k8s.AcquireLease(ctx, cl, name, namespace, holder) + Expect(err).ToNot(HaveOccurred()) + + freshLease := new(coordv1.Lease) + nn := types.NamespacedName{ + Name: name, + Namespace: namespace, + } + err = cl.Get(ctx, nn, freshLease) + Expect(err).ToNot(HaveOccurred()) + Expect(freshLease.Spec.AcquireTime).NotTo(BeNil()) + Expect(freshLease.Spec.HolderIdentity, lease.Spec.HolderIdentity) + }) + + It("should be delete a lease", func() { + ctx := context.Background() + + name := "backup-lock" + namespace := "test" + holder := "backup1" + + lease := &coordv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: coordv1.LeaseSpec{ + AcquireTime: &metav1.MicroTime{Time: time.Now()}, + HolderIdentity: &holder, + }, + } + + cl := fake.NewFakeClient(lease) + + err := k8s.ReleaseLease(ctx, cl, name, namespace) + Expect(err).ToNot(HaveOccurred()) + + freshLease := new(coordv1.Lease) + nn := types.NamespacedName{ + Name: name, + Namespace: namespace, + } + err = cl.Get(ctx, nn, freshLease) + Expect(err).To(HaveOccurred()) + Expect(k8serrors.IsNotFound(err)).To(BeTrue()) + }) +}) diff --git a/pkg/naming/backup.go b/pkg/naming/backup.go index c7e876d03..2d44941f9 100644 --- a/pkg/naming/backup.go +++ b/pkg/naming/backup.go @@ -9,6 +9,10 @@ import ( "k8s.io/apimachinery/pkg/util/validation" ) +func BackupLeaseName(clusterName string) string { + return "pxc-" + clusterName + "-backup-lock" +} + // BackupJobName generates legit name for backup resources. // k8s sets the `job-name` label for the created by job pod. // So we have to be sure that job name won't be longer than 63 symbols. diff --git a/pkg/naming/labels.go b/pkg/naming/labels.go index 8fe38bd66..f1f7d007e 100644 --- a/pkg/naming/labels.go +++ b/pkg/naming/labels.go @@ -30,6 +30,14 @@ const ( LabelPerconaRestoreJobName = perconaPrefix + "restore-job-name" ) +func GetLabelBackupType(cr *api.PerconaXtraDBCluster) string { + if cr.CompareVersionWith("1.16.0") < 0 { + return "type" + } + + return LabelPerconaBackupType +} + func LabelsCluster(cr *api.PerconaXtraDBCluster) map[string]string { return map[string]string{ LabelAppKubernetesName: "percona-xtradb-cluster", diff --git a/pkg/naming/naming.go b/pkg/naming/naming.go index 3dd8f58ba..67237ec2f 100644 --- a/pkg/naming/naming.go +++ b/pkg/naming/naming.go @@ -1,7 +1,8 @@ package naming const ( - annotationPrefix = "percona.com/" + annotationPrefix = "percona.com/" + internalAnnotationPrefix = "internal." + annotationPrefix ) const ( @@ -11,6 +12,7 @@ const ( FinalizerDeletePxcPvc = annotationPrefix + "delete-pxc-pvc" FinalizerDeleteBackup = annotationPrefix + "delete-backup" FinalizerS3DeleteBackup = "delete-s3-backup" + FinalizerReleaseLock = internalAnnotationPrefix + "release-lock" ) const ( diff --git a/pkg/pxc/app/binlogcollector/binlog-collector.go b/pkg/pxc/app/binlogcollector/binlog-collector.go index 5eb2940ef..ac7b725ec 100644 --- a/pkg/pxc/app/binlogcollector/binlog-collector.go +++ b/pkg/pxc/app/binlogcollector/binlog-collector.go @@ -346,7 +346,7 @@ func GetPod(ctx context.Context, c client.Client, cr *api.PerconaXtraDBCluster) var GapFileNotFound = errors.New("gap file not found") -func RemoveGapFile(ctx context.Context, cr *api.PerconaXtraDBCluster, c *clientcmd.Client, pod *corev1.Pod) error { +func RemoveGapFile(c *clientcmd.Client, pod *corev1.Pod) error { stderrBuf := &bytes.Buffer{} err := c.Exec(pod, "pitr", []string{"/bin/bash", "-c", "rm /tmp/gap-detected"}, nil, nil, stderrBuf, false) if err != nil { @@ -359,7 +359,7 @@ func RemoveGapFile(ctx context.Context, cr *api.PerconaXtraDBCluster, c *clientc return nil } -func RemoveTimelineFile(ctx context.Context, cr *api.PerconaXtraDBCluster, c *clientcmd.Client, pod *corev1.Pod) error { +func RemoveTimelineFile(c *clientcmd.Client, pod *corev1.Pod) error { stderrBuf := &bytes.Buffer{} err := c.Exec(pod, "pitr", []string{"/bin/bash", "-c", "rm /tmp/pitr-timeline"}, nil, nil, stderrBuf, false) if err != nil { diff --git a/pkg/pxc/backup/job.go b/pkg/pxc/backup/job.go index 476156428..5da545083 100644 --- a/pkg/pxc/backup/job.go +++ b/pkg/pxc/backup/job.go @@ -19,11 +19,7 @@ import ( ) func (*Backup) Job(cr *api.PerconaXtraDBClusterBackup, cluster *api.PerconaXtraDBCluster) *batchv1.Job { - labelKeyBackupType := "type" - if cluster.CompareVersionWith("1.16.0") >= 0 { - labelKeyBackupType = naming.LabelPerconaBackupType - } - + labelKeyBackupType := naming.GetLabelBackupType(cluster) jobName := naming.BackupJobName(cr.Name, cr.Labels[labelKeyBackupType] == "cron") return &batchv1.Job{ diff --git a/pkg/pxc/backup/pitr.go b/pkg/pxc/backup/pitr.go index 462f1118b..55e2f7770 100644 --- a/pkg/pxc/backup/pitr.go +++ b/pkg/pxc/backup/pitr.go @@ -89,7 +89,7 @@ func CheckPITRErrors(ctx context.Context, cl client.Client, clcmd *clientcmd.Cli return errors.Wrap(err, "update backup status") } - if err := binlogcollector.RemoveGapFile(ctx, cr, clcmd, collectorPod); err != nil { + if err := binlogcollector.RemoveGapFile(clcmd, collectorPod); err != nil { if !errors.Is(err, binlogcollector.GapFileNotFound) { return errors.Wrap(err, "remove gap file") }