Skip to content

Commit

Permalink
Merge pull request wal-g#1502 from rdjjke/backup-push-failover-storages
Browse files Browse the repository at this point in the history
Support failover storages in `backup-push` and backups in `st transfer`
  • Loading branch information
rdjjke authored Jun 28, 2023
2 parents 0cbfc99 + be07ca5 commit aa3640e
Show file tree
Hide file tree
Showing 60 changed files with 2,080 additions and 931 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/dockertests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ jobs:
'make TEST="pg_backup_mark_permanent_test" pg_integration_test',
'make TEST="pg_config_test" pg_integration_test',
'make TEST="pg_crypto_test" pg_integration_test',
'make TEST="pg_copy_all_test" pg_integration_test',
'make TEST="pg_copy_backup_test" pg_integration_test',
'make TEST="pg_delta_backup_wal_delta_test" pg_integration_test',
'make TEST="pg_full_backup_test" pg_integration_test',
'make TEST="pg_full_backup_streamed_test" pg_integration_test',
Expand Down
14 changes: 7 additions & 7 deletions cmd/common/st/delete_object.go → cmd/common/st/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,21 @@ import (
"github.com/wal-g/wal-g/pkg/storages/storage"
)

const deleteObjectShortDescription = "Delete the specified storage object"
const removeShortDescription = "Removes objects by the prefix from the specified storage"

// deleteObjectCmd represents the deleteObject command
var deleteObjectCmd = &cobra.Command{
Use: "rm relative_object_path",
Short: deleteObjectShortDescription,
// removeCmd represents the deleteObject command
var removeCmd = &cobra.Command{
Use: "rm prefix",
Short: removeShortDescription,
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
err := multistorage.ExecuteOnStorage(targetStorage, func(folder storage.Folder) error {
return storagetools.HandleDeleteObject(args[0], folder)
return storagetools.HandleRemove(args[0], folder)
})
tracelog.ErrorLogger.FatalOnError(err)
},
}

func init() {
StorageToolsCmd.AddCommand(deleteObjectCmd)
StorageToolsCmd.AddCommand(removeCmd)
}
62 changes: 17 additions & 45 deletions cmd/common/st/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,43 +7,22 @@ import (

"github.com/spf13/cobra"
"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/internal/storagetools"
)

const transferShortDescription = "Moves objects from one storage to another (Postgres only)"

// transferCmd represents the transfer command
var transferCmd = &cobra.Command{
Use: "transfer prefix --source='source_storage' [--target='target_storage']",
Use: "transfer",
Short: transferShortDescription,
Long: "The command is usually used to move objects from a failover storage to the primary one, when it becomes alive. " +
"By default, objects that exist in both storages are neither overwritten in the target storage nor deleted from the source one.",
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
err := validateFlags()
Long: "The command allows to move objects between storages. It's usually used to sync the primary storage with " +
"a failover, when it becomes alive. By default, objects that exist in both storages are neither overwritten " +
"in the target storage nor deleted from the source one. (Postgres only)",
PersistentPreRunE: func(_ *cobra.Command, _ []string) error {
err := validateCommonFlags()
if err != nil {
tracelog.ErrorLogger.FatalError(fmt.Errorf("invalid flags: %w", err))
}

cfg := &storagetools.TransferHandlerConfig{
Prefix: args[0],
Overwrite: transferOverwrite,
FailOnFirstErr: transferFailFast,
Concurrency: transferConcurrency,
MaxFiles: adjustMaxFiles(transferMax),
AppearanceChecks: transferAppearanceChecks,
AppearanceChecksInterval: transferAppearanceChecksInterval,
}

handler, err := storagetools.NewTransferHandler(transferSourceStorage, targetStorage, cfg)
if err != nil {
tracelog.ErrorLogger.FatalError(err)
}

err = handler.Handle()
if err != nil {
tracelog.ErrorLogger.FatalError(err)
}
return nil
},
}

Expand All @@ -52,31 +31,31 @@ var (
transferOverwrite bool
transferFailFast bool
transferConcurrency int
transferMax int
transferMaxFiles uint
transferAppearanceChecks uint
transferAppearanceChecksInterval time.Duration
)

func init() {
transferCmd.Flags().StringVarP(&transferSourceStorage, "source", "s", "",
transferCmd.PersistentFlags().StringVarP(&transferSourceStorage, "source", "s", "",
"storage name to move files from. Use 'default' to select the primary storage")
transferCmd.Flags().BoolVarP(&transferOverwrite, "overwrite", "o", false,
transferCmd.PersistentFlags().BoolVarP(&transferOverwrite, "overwrite", "o", false,
"whether to overwrite already existing files in the target storage and remove them from the source one")
transferCmd.Flags().BoolVar(&transferFailFast, "fail-fast", false,
transferCmd.PersistentFlags().BoolVar(&transferFailFast, "fail-fast", false,
"if this flag is set, any error occurred with transferring a separate file will lead the whole command to stop immediately")
transferCmd.Flags().IntVarP(&transferConcurrency, "concurrency", "c", 10,
transferCmd.PersistentFlags().IntVarP(&transferConcurrency, "concurrency", "c", 10,
"number of concurrent workers to move files. Value 1 turns concurrency off")
transferCmd.Flags().IntVarP(&transferMax, "max", "m", -1,
"max number of files to move in this run. Negative numbers turn the limit off")
transferCmd.Flags().UintVar(&transferAppearanceChecks, "appearance-checks", 3,
transferCmd.PersistentFlags().UintVarP(&transferMaxFiles, "max-files", "m", math.MaxInt,
"max number of files to move in this run")
transferCmd.PersistentFlags().UintVar(&transferAppearanceChecks, "appearance-checks", 3,
"number of times to check if a file is appeared for reading in the target storage after writing it. Value 0 turns checking off")
transferCmd.Flags().DurationVar(&transferAppearanceChecksInterval, "appearance-checks-interval", time.Second,
transferCmd.PersistentFlags().DurationVar(&transferAppearanceChecksInterval, "appearance-checks-interval", time.Second,
"minimum time interval between performing checks for files to appear in the target storage")

StorageToolsCmd.AddCommand(transferCmd)
}

func validateFlags() error {
func validateCommonFlags() error {
if transferSourceStorage == "" {
return fmt.Errorf("source storage must be specified")
}
Expand All @@ -94,10 +73,3 @@ func validateFlags() error {
}
return nil
}

func adjustMaxFiles(max int) int {
if max < 0 {
return math.MaxInt
}
return max
}
48 changes: 48 additions & 0 deletions cmd/common/st/transfer_backups.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package st

import (
"math"

"github.com/spf13/cobra"
"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/internal/storagetools/transfer"
)

const backupsShortDescription = "Moves all backups from one storage to another"

// backupsCmd represents the backups command
var backupsCmd = &cobra.Command{
Use: "backups [backup_name] --source='source_storage' [--target='target_storage']",
Short: backupsShortDescription,
Args: cobra.RangeArgs(0, 1),
Run: func(_ *cobra.Command, args []string) {
var fileLister transfer.FileLister
if len(args) == 0 {
fileLister = transfer.NewAllBackupsFileLister(transferOverwrite, int(transferMaxFiles), int(transferMaxBackups))
} else {
fileLister = transfer.NewSingleBackupFileLister(args[0], transferOverwrite, int(transferMaxFiles))
}

cfg := &transfer.HandlerConfig{
FailOnFirstErr: transferFailFast,
Concurrency: transferConcurrency,
AppearanceChecks: transferAppearanceChecks,
AppearanceChecksInterval: transferAppearanceChecksInterval,
}

handler, err := transfer.NewHandler(transferSourceStorage, targetStorage, fileLister, cfg)
tracelog.ErrorLogger.FatalOnError(err)

err = handler.Handle()
tracelog.ErrorLogger.FatalOnError(err)
},
}

var transferMaxBackups uint

func init() {
backupsCmd.Flags().UintVar(&transferMaxBackups, "max-backups", math.MaxInt,
"max number of backups to move in this run. Is ignored if backup_name is specified")

transferCmd.AddCommand(backupsCmd)
}
40 changes: 40 additions & 0 deletions cmd/common/st/transfer_files.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package st

import (
"github.com/spf13/cobra"
"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/internal/storagetools/transfer"
)

const filesShortDescription = "Moves all files by a prefix from one storage to another without any special treatment"

// filesCmd represents the files command
var filesCmd = &cobra.Command{
Use: "files prefix --source='source_storage' [--target='target_storage']",
Short: filesShortDescription,
Args: cobra.ExactArgs(1),
Run: func(_ *cobra.Command, args []string) {
transferFiles(args[0])
},
}

func transferFiles(prefix string) {
separateFileLister := transfer.NewRegularFileLister(prefix, transferOverwrite, int(transferMaxFiles))

cfg := &transfer.HandlerConfig{
FailOnFirstErr: transferFailFast,
Concurrency: transferConcurrency,
AppearanceChecks: transferAppearanceChecks,
AppearanceChecksInterval: transferAppearanceChecksInterval,
}

handler, err := transfer.NewHandler(transferSourceStorage, targetStorage, separateFileLister, cfg)
tracelog.ErrorLogger.FatalOnError(err)

err = handler.Handle()
tracelog.ErrorLogger.FatalOnError(err)
}

func init() {
transferCmd.AddCommand(filesCmd)
}
22 changes: 22 additions & 0 deletions cmd/common/st/transfer_pg_wals.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package st

import (
"github.com/spf13/cobra"
"github.com/wal-g/wal-g/utility"
)

const pgWALsShortDescription = "Moves all PostgreSQL WAL files from one storage to another"

// pgWALsCmd represents the pg-wals command
var pgWALsCmd = &cobra.Command{
Use: "pg-wals --source='source_storage' [--target='target_storage']",
Short: pgWALsShortDescription,
Args: cobra.NoArgs,
Run: func(_ *cobra.Command, _ []string) {
transferFiles(utility.WalPath)
},
}

func init() {
transferCmd.AddCommand(pgWALsCmd)
}
15 changes: 3 additions & 12 deletions cmd/common/st/transfer_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package st

import (
"math"
"testing"

"github.com/stretchr/testify/assert"
)

func Test_validateFlags(t *testing.T) {
func Test_validateCommonFlags(t *testing.T) {
tests := []struct {
name string
source, target string
Expand All @@ -26,15 +23,9 @@ func Test_validateFlags(t *testing.T) {
transferSourceStorage = tt.source
targetStorage = tt.target
transferConcurrency = tt.concurrency
if err := validateFlags(); (err != nil) != tt.wantErr {
t.Errorf("validateFlags() error = %v, wantErr %v", err, tt.wantErr)
if err := validateCommonFlags(); (err != nil) != tt.wantErr {
t.Errorf("validateCommonFlags() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}

func Test_adjustMaxFiles(t *testing.T) {
assert.Equal(t, math.MaxInt, adjustMaxFiles(-1))
assert.Equal(t, 0, adjustMaxFiles(0))
assert.Equal(t, 123, adjustMaxFiles(123))
}
5 changes: 4 additions & 1 deletion cmd/gp/backup_push_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ var (

greenplum.SetSegmentStoragePrefix(contentID)

uploader, err := internal.ConfigureUploader()
tracelog.ErrorLogger.FatalOnError(err)

dataDirectory := args[0]

if deltaFromName == "" {
Expand Down Expand Up @@ -56,7 +59,7 @@ var (
tarBallComposerType := postgres.RegularComposer
withoutFilesMetadata := false

arguments := postgres.NewBackupArguments(dataDirectory, utility.BaseBackupPath,
arguments := postgres.NewBackupArguments(uploader, dataDirectory, utility.BaseBackupPath,
permanent, verifyPageChecksums,
fullBackup, storeAllCorruptBlocks,
tarBallComposerType, greenplum.NewSegDeltaBackupConfigurator(deltaBaseSelector),
Expand Down
12 changes: 11 additions & 1 deletion cmd/pg/backup_push.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pg

import (
"github.com/wal-g/wal-g/internal/multistorage"
"github.com/wal-g/wal-g/utility"

"github.com/wal-g/wal-g/internal/databases/postgres"
Expand Down Expand Up @@ -44,6 +45,15 @@ var (
Run: func(cmd *cobra.Command, args []string) {
internal.ConfigureLimiters()

baseUploader, err := internal.ConfigureUploader()
tracelog.ErrorLogger.FatalOnError(err)

failover, err := internal.InitFailoverStorages()
tracelog.ErrorLogger.FatalOnError(err)

uploader, err := multistorage.NewUploader(baseUploader, failover)
tracelog.ErrorLogger.FatalOnError(err)

var dataDirectory string

if len(args) > 0 {
Expand Down Expand Up @@ -88,7 +98,7 @@ var (
userData, err := internal.UnmarshalSentinelUserData(userDataRaw)
tracelog.ErrorLogger.FatalfOnError("Failed to unmarshal the provided UserData: %s", err)

arguments := postgres.NewBackupArguments(dataDirectory, utility.BaseBackupPath,
arguments := postgres.NewBackupArguments(uploader, dataDirectory, utility.BaseBackupPath,
permanent, verifyPageChecksums || viper.GetBool(internal.VerifyPageChecksumsSetting),
fullBackup, storeAllCorruptBlocks || viper.GetBool(internal.StoreAllCorruptBlocksSetting),
tarBallComposerType, postgres.NewRegularDeltaBackupConfigurator(deltaBaseSelector),
Expand Down
Loading

0 comments on commit aa3640e

Please sign in to comment.