Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: incremental backup cronJob #8828

Merged
merged 1 commit into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions controllers/apps/transformer_cluster_backup_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,15 +679,19 @@ func (r *clusterBackupPolicyTransformer) mergeClusterBackup(
}
hasSyncPITRMethod = true
}
if as.Spec.BackupType == dpv1alpha1.BackupTypeIncremental && backup.IncrementalBackupEnabled != nil &&
!hasSyncIncMethod && len(backup.Method) > 0 && m.CompatibleMethod == backup.Method {
// auto-sync the first compatible incremental backup for the 'incrementalBackupEnabled' option.
mergeSchedulePolicy(&dpv1alpha1.SchedulePolicy{
Enabled: backup.IncrementalBackupEnabled,
RetentionPeriod: backup.RetentionPeriod,
CronExpression: backup.IncrementalCronExpression,
}, &backupSchedule.Spec.Schedules[i])
hasSyncIncMethod = true
if as.Spec.BackupType == dpv1alpha1.BackupTypeIncremental {
if len(backup.Method) == 0 || m.CompatibleMethod != backup.Method {
// disable other incremental backup schedules
backupSchedule.Spec.Schedules[i].Enabled = boolptr.False()
} else if backup.IncrementalBackupEnabled != nil && !hasSyncIncMethod {
// auto-sync the first compatible incremental backup for the 'incrementalBackupEnabled' option.
mergeSchedulePolicy(&dpv1alpha1.SchedulePolicy{
Enabled: backup.IncrementalBackupEnabled,
RetentionPeriod: backup.RetentionPeriod,
CronExpression: backup.IncrementalCronExpression,
}, &backupSchedule.Spec.Schedules[i])
hasSyncIncMethod = true
}
}
if as.Spec.BackupType == dpv1alpha1.BackupTypeFull && enableAutoBackup {
// disable the automatic backup for other full backup method
Expand Down
43 changes: 43 additions & 0 deletions controllers/dataprotection/backup_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,49 @@ var _ = Describe("Backup Controller test", func() {

})

It("creates scheduled incremental backups", func() {
By("creating a unscheduled full backup from backupPolicy " + testdp.BackupPolicyName)
fullBackup1 := testdp.NewFakeBackup(&testCtx, func(backup *dpv1alpha1.Backup) {
backup.Name = "full-bakcup-1"
})
fullBackupKey1 := client.ObjectKeyFromObject(fullBackup1)
By("waiting for the full backup " + fullBackupKey1.String() + " to complete")
checkBackupCompleted(fullBackup1)
mockBackupStatus(fullBackup1, "", "")
By("creating a scheduled incremental backup " + incBackupName + "1")
incBackup1 := mockIncBackupAndComplete(true, incBackupName+"1", "", fullBackup1.Name, fullBackup1.Name)
By("creating a scheduled incremental backup " + incBackupName + "2")
_ = mockIncBackupAndComplete(true, incBackupName+"2", "", incBackup1.Name, fullBackup1.Name)
By("creating a scheduled full backup from backupPolicy " + testdp.BackupPolicyName)
fullBackup2 := testdp.NewFakeBackup(&testCtx, func(backup *dpv1alpha1.Backup) {
backup.Name = "full-bakcup-2"
backup.Labels[dptypes.BackupScheduleLabelKey] = scheduleName
})
fullBackupKey2 := client.ObjectKeyFromObject(fullBackup2)
By("waiting for the full backup " + fullBackupKey2.String() + " to complete")
checkBackupCompleted(fullBackup2)
mockBackupStatus(fullBackup2, "", "")
By("creating a scheduled incremental backup " + incBackupName + "3")
incBackup3 := mockIncBackupAndComplete(true, incBackupName+"3", "", fullBackup2.Name, fullBackup2.Name)
By("creating a scheduled incremental backup " + incBackupName + "4")
_ = mockIncBackupAndComplete(true, incBackupName+"4", "", incBackup3.Name, fullBackup2.Name)
By("creating a scheduled full backup from backupPolicy " + testdp.BackupPolicyName)
fullBackup3 := testdp.NewFakeBackup(&testCtx, func(backup *dpv1alpha1.Backup) {
backup.Name = "full-bakcup-3"
backup.Labels[dptypes.BackupScheduleLabelKey] = scheduleName
})
fullBackupKey3 := client.ObjectKeyFromObject(fullBackup3)
By("waiting for the full backup " + fullBackupKey3.String() + " to complete")
checkBackupCompleted(fullBackup3)
mockBackupStatus(fullBackup3, "", "")
By("creating a scheduled incremental backup " + incBackupName + "5")
incBackup5 := mockIncBackupAndComplete(true, incBackupName+"5", "", fullBackup3.Name, fullBackup3.Name)
By("creating a unscheduled incremental backup " + incBackupName + "6")
_ = mockIncBackupAndComplete(false, incBackupName+"6", "", incBackup5.Name, fullBackup3.Name)
By("creating a scheduled incremental backup " + incBackupName + "7")
_ = mockIncBackupAndComplete(true, incBackupName+"7", "", incBackup5.Name, fullBackup3.Name)
})

It("creates an incremental backup without valid parent backups", func() {
By("creating an incremental backup without specific parent backup")
incBackup1 := newFakeIncBackup(incBackupName+"1", "", false)
Expand Down
19 changes: 10 additions & 9 deletions controllers/dataprotection/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,16 @@ func FindParentBackupIfNotSet(ctx context.Context, cli client.Client, backup *dp
if err != nil {
return nil, err
}
// 3. prefer the latest backup; if it is an incremental backup, it should be based on the latest full backup.
// 3. get the latest unscheduled full backup if scheduled full backups not found
// no scheduled incremental backups or some based on unscheduled full backup
if len(scheduleName) != 0 && latestFullBackup == nil {
delete(labelMap, dptypes.BackupScheduleLabelKey)
latestFullBackup, err = getLatestParentBackup(labelMap, false)
if err != nil {
return nil, err
}
}
// 4. prefer the latest backup; if it is an incremental backup, it should be based on the latest full backup.
if latestIncrementalBackup != nil && latestFullBackup != nil {
if !dputils.CompareWithBackupStopTime(*latestIncrementalBackup, *latestFullBackup) &&
latestIncrementalBackup.Status.BaseBackupName == latestFullBackup.Name {
Expand All @@ -682,14 +691,6 @@ func FindParentBackupIfNotSet(ctx context.Context, cli client.Client, backup *dp
// or the latest full backup is newer than the base backup of the latest incremental backup
return latestFullBackup, nil
}
// 4. get the latest unscheduled full backup if scheduled backups not found
if len(scheduleName) != 0 && latestFullBackup == nil {
delete(labelMap, dptypes.BackupScheduleLabelKey)
latestFullBackup, err = getLatestParentBackup(labelMap, false)
if err != nil {
return nil, err
}
}
// 5. only full backup found
if latestFullBackup != nil {
return latestFullBackup, nil
Expand Down
1 change: 1 addition & 0 deletions deploy/helm/templates/rbac/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ rules:
verbs:
- create
- get
- list
- patch
- update
- apiGroups:
Expand Down
42 changes: 40 additions & 2 deletions pkg/dataprotection/backup/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"reflect"
"slices"
"sort"
"strings"

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -174,7 +175,11 @@ func (s *Scheduler) buildCronJob(schedulePolicy *dpv1alpha1.SchedulePolicy, cron

func (s *Scheduler) buildPodSpec(schedulePolicy *dpv1alpha1.SchedulePolicy) (*corev1.PodSpec, error) {
// TODO(ldm): add backup deletionPolicy
createBackupCmd := fmt.Sprintf(`
checkCommand, err := s.buildCheckCommand(schedulePolicy)
if err != nil {
return nil, err
}
createBackupCmd := fmt.Sprintf(`%s
kubectl create -f - <<EOF
apiVersion: dataprotection.kubeblocks.io/v1alpha1
kind: Backup
Expand All @@ -189,7 +194,7 @@ spec:
backupMethod: %s
retentionPeriod: %s
EOF
`, s.BackupSchedule.Name, s.generateBackupName(schedulePolicy), s.BackupSchedule.Namespace,
`, checkCommand, s.BackupSchedule.Name, s.generateBackupName(schedulePolicy), s.BackupSchedule.Namespace,
s.BackupPolicy.Name, schedulePolicy.BackupMethod,
schedulePolicy.RetentionPeriod)

Expand Down Expand Up @@ -472,3 +477,36 @@ func (s *Scheduler) reconcileReconfigure(backupSchedule *dpv1alpha1.BackupSchedu
}
return nil
}

func (s *Scheduler) buildCheckCommand(schedulePolicy *dpv1alpha1.SchedulePolicy) (string, error) {
backupMethod := dputils.GetBackupMethodByName(schedulePolicy.BackupMethod, s.BackupPolicy)
actionSet, err := dputils.GetActionSetByName(s.RequestCtx, s.Client, backupMethod.ActionSetName)
if err != nil {
return "", err
}
// command is used by incremental backup
if backupType := dputils.GetBackupType(actionSet, backupMethod.SnapshotVolumes); backupType != dpv1alpha1.BackupTypeIncremental {
return "", nil
}
// filter completed full backup, if there is no completed full backup, exit.
labelMap := map[string]string{
dptypes.BackupPolicyLabelKey: s.BackupSchedule.Spec.BackupPolicyName,
dptypes.BackupTypeLabelKey: string(dpv1alpha1.BackupTypeFull),
}
labelSlice := []string{}
for k, v := range labelMap {
labelSlice = append(labelSlice, fmt.Sprintf("%s=%s", k, v))
}
checkCommand := fmt.Sprintf(`
count=$(kubectl get backups.dataprotection.kubeblocks.io -n %s --selector=%s -o jsonpath='{range .items[?(@.spec.backupMethod=="%s")]}{.status.phase}{"\n"}{end}' | grep "Completed" | wc -l)
if [ "$count" -eq 0 ]; then
echo "No completed full backups found. Exiting."
exit 0
fi
`,
s.BackupSchedule.Namespace,
strings.Join(labelSlice, ","),
backupMethod.CompatibleMethod,
)
return checkCommand, nil
}
Loading