diff --git a/.github/workflows/dockertests.yml b/.github/workflows/dockertests.yml index 280ab26a4..0f6725ae3 100644 --- a/.github/workflows/dockertests.yml +++ b/.github/workflows/dockertests.yml @@ -76,6 +76,8 @@ jobs: 'make TEST="pg_backup_mark_permanent_test" pg_integration_test', 'make TEST="pg_config_test" pg_integration_test', 'make TEST="pg_crypto_test" pg_integration_test', + 'make TEST="pg_copy_all_test" pg_integration_test', + 'make TEST="pg_copy_backup_test" pg_integration_test', 'make TEST="pg_delta_backup_wal_delta_test" pg_integration_test', 'make TEST="pg_full_backup_test" pg_integration_test', 'make TEST="pg_full_backup_streamed_test" pg_integration_test', diff --git a/cmd/common/st/delete_object.go b/cmd/common/st/remove.go similarity index 57% rename from cmd/common/st/delete_object.go rename to cmd/common/st/remove.go index 3e3440220..0f9247ee0 100644 --- a/cmd/common/st/delete_object.go +++ b/cmd/common/st/remove.go @@ -8,21 +8,21 @@ import ( "github.com/wal-g/wal-g/pkg/storages/storage" ) -const deleteObjectShortDescription = "Delete the specified storage object" +const removeShortDescription = "Removes objects by the prefix from the specified storage" -// deleteObjectCmd represents the deleteObject command -var deleteObjectCmd = &cobra.Command{ - Use: "rm relative_object_path", - Short: deleteObjectShortDescription, +// removeCmd represents the deleteObject command +var removeCmd = &cobra.Command{ + Use: "rm prefix", + Short: removeShortDescription, Args: cobra.ExactArgs(1), Run: func(cmd *cobra.Command, args []string) { err := multistorage.ExecuteOnStorage(targetStorage, func(folder storage.Folder) error { - return storagetools.HandleDeleteObject(args[0], folder) + return storagetools.HandleRemove(args[0], folder) }) tracelog.ErrorLogger.FatalOnError(err) }, } func init() { - StorageToolsCmd.AddCommand(deleteObjectCmd) + StorageToolsCmd.AddCommand(removeCmd) } diff --git a/cmd/common/st/transfer.go b/cmd/common/st/transfer.go index 76b095e9d..f2f3da59b 100644 --- a/cmd/common/st/transfer.go +++ b/cmd/common/st/transfer.go @@ -7,43 +7,22 @@ import ( "github.com/spf13/cobra" "github.com/wal-g/tracelog" - "github.com/wal-g/wal-g/internal/storagetools" ) const transferShortDescription = "Moves objects from one storage to another (Postgres only)" -// transferCmd represents the transfer command var transferCmd = &cobra.Command{ - Use: "transfer prefix --source='source_storage' [--target='target_storage']", + Use: "transfer", Short: transferShortDescription, - Long: "The command is usually used to move objects from a failover storage to the primary one, when it becomes alive. " + - "By default, objects that exist in both storages are neither overwritten in the target storage nor deleted from the source one.", - Args: cobra.ExactArgs(1), - Run: func(cmd *cobra.Command, args []string) { - err := validateFlags() + Long: "The command allows to move objects between storages. It's usually used to sync the primary storage with " + + "a failover, when it becomes alive. By default, objects that exist in both storages are neither overwritten " + + "in the target storage nor deleted from the source one. (Postgres only)", + PersistentPreRunE: func(_ *cobra.Command, _ []string) error { + err := validateCommonFlags() if err != nil { tracelog.ErrorLogger.FatalError(fmt.Errorf("invalid flags: %w", err)) } - - cfg := &storagetools.TransferHandlerConfig{ - Prefix: args[0], - Overwrite: transferOverwrite, - FailOnFirstErr: transferFailFast, - Concurrency: transferConcurrency, - MaxFiles: adjustMaxFiles(transferMax), - AppearanceChecks: transferAppearanceChecks, - AppearanceChecksInterval: transferAppearanceChecksInterval, - } - - handler, err := storagetools.NewTransferHandler(transferSourceStorage, targetStorage, cfg) - if err != nil { - tracelog.ErrorLogger.FatalError(err) - } - - err = handler.Handle() - if err != nil { - tracelog.ErrorLogger.FatalError(err) - } + return nil }, } @@ -52,31 +31,31 @@ var ( transferOverwrite bool transferFailFast bool transferConcurrency int - transferMax int + transferMaxFiles uint transferAppearanceChecks uint transferAppearanceChecksInterval time.Duration ) func init() { - transferCmd.Flags().StringVarP(&transferSourceStorage, "source", "s", "", + transferCmd.PersistentFlags().StringVarP(&transferSourceStorage, "source", "s", "", "storage name to move files from. Use 'default' to select the primary storage") - transferCmd.Flags().BoolVarP(&transferOverwrite, "overwrite", "o", false, + transferCmd.PersistentFlags().BoolVarP(&transferOverwrite, "overwrite", "o", false, "whether to overwrite already existing files in the target storage and remove them from the source one") - transferCmd.Flags().BoolVar(&transferFailFast, "fail-fast", false, + transferCmd.PersistentFlags().BoolVar(&transferFailFast, "fail-fast", false, "if this flag is set, any error occurred with transferring a separate file will lead the whole command to stop immediately") - transferCmd.Flags().IntVarP(&transferConcurrency, "concurrency", "c", 10, + transferCmd.PersistentFlags().IntVarP(&transferConcurrency, "concurrency", "c", 10, "number of concurrent workers to move files. Value 1 turns concurrency off") - transferCmd.Flags().IntVarP(&transferMax, "max", "m", -1, - "max number of files to move in this run. Negative numbers turn the limit off") - transferCmd.Flags().UintVar(&transferAppearanceChecks, "appearance-checks", 3, + transferCmd.PersistentFlags().UintVarP(&transferMaxFiles, "max-files", "m", math.MaxInt, + "max number of files to move in this run") + transferCmd.PersistentFlags().UintVar(&transferAppearanceChecks, "appearance-checks", 3, "number of times to check if a file is appeared for reading in the target storage after writing it. Value 0 turns checking off") - transferCmd.Flags().DurationVar(&transferAppearanceChecksInterval, "appearance-checks-interval", time.Second, + transferCmd.PersistentFlags().DurationVar(&transferAppearanceChecksInterval, "appearance-checks-interval", time.Second, "minimum time interval between performing checks for files to appear in the target storage") StorageToolsCmd.AddCommand(transferCmd) } -func validateFlags() error { +func validateCommonFlags() error { if transferSourceStorage == "" { return fmt.Errorf("source storage must be specified") } @@ -94,10 +73,3 @@ func validateFlags() error { } return nil } - -func adjustMaxFiles(max int) int { - if max < 0 { - return math.MaxInt - } - return max -} diff --git a/cmd/common/st/transfer_backups.go b/cmd/common/st/transfer_backups.go new file mode 100644 index 000000000..94044ccaf --- /dev/null +++ b/cmd/common/st/transfer_backups.go @@ -0,0 +1,48 @@ +package st + +import ( + "math" + + "github.com/spf13/cobra" + "github.com/wal-g/tracelog" + "github.com/wal-g/wal-g/internal/storagetools/transfer" +) + +const backupsShortDescription = "Moves all backups from one storage to another" + +// backupsCmd represents the backups command +var backupsCmd = &cobra.Command{ + Use: "backups [backup_name] --source='source_storage' [--target='target_storage']", + Short: backupsShortDescription, + Args: cobra.RangeArgs(0, 1), + Run: func(_ *cobra.Command, args []string) { + var fileLister transfer.FileLister + if len(args) == 0 { + fileLister = transfer.NewAllBackupsFileLister(transferOverwrite, int(transferMaxFiles), int(transferMaxBackups)) + } else { + fileLister = transfer.NewSingleBackupFileLister(args[0], transferOverwrite, int(transferMaxFiles)) + } + + cfg := &transfer.HandlerConfig{ + FailOnFirstErr: transferFailFast, + Concurrency: transferConcurrency, + AppearanceChecks: transferAppearanceChecks, + AppearanceChecksInterval: transferAppearanceChecksInterval, + } + + handler, err := transfer.NewHandler(transferSourceStorage, targetStorage, fileLister, cfg) + tracelog.ErrorLogger.FatalOnError(err) + + err = handler.Handle() + tracelog.ErrorLogger.FatalOnError(err) + }, +} + +var transferMaxBackups uint + +func init() { + backupsCmd.Flags().UintVar(&transferMaxBackups, "max-backups", math.MaxInt, + "max number of backups to move in this run. Is ignored if backup_name is specified") + + transferCmd.AddCommand(backupsCmd) +} diff --git a/cmd/common/st/transfer_files.go b/cmd/common/st/transfer_files.go new file mode 100644 index 000000000..dfd56d458 --- /dev/null +++ b/cmd/common/st/transfer_files.go @@ -0,0 +1,40 @@ +package st + +import ( + "github.com/spf13/cobra" + "github.com/wal-g/tracelog" + "github.com/wal-g/wal-g/internal/storagetools/transfer" +) + +const filesShortDescription = "Moves all files by a prefix from one storage to another without any special treatment" + +// filesCmd represents the files command +var filesCmd = &cobra.Command{ + Use: "files prefix --source='source_storage' [--target='target_storage']", + Short: filesShortDescription, + Args: cobra.ExactArgs(1), + Run: func(_ *cobra.Command, args []string) { + transferFiles(args[0]) + }, +} + +func transferFiles(prefix string) { + separateFileLister := transfer.NewRegularFileLister(prefix, transferOverwrite, int(transferMaxFiles)) + + cfg := &transfer.HandlerConfig{ + FailOnFirstErr: transferFailFast, + Concurrency: transferConcurrency, + AppearanceChecks: transferAppearanceChecks, + AppearanceChecksInterval: transferAppearanceChecksInterval, + } + + handler, err := transfer.NewHandler(transferSourceStorage, targetStorage, separateFileLister, cfg) + tracelog.ErrorLogger.FatalOnError(err) + + err = handler.Handle() + tracelog.ErrorLogger.FatalOnError(err) +} + +func init() { + transferCmd.AddCommand(filesCmd) +} diff --git a/cmd/common/st/transfer_pg_wals.go b/cmd/common/st/transfer_pg_wals.go new file mode 100644 index 000000000..153c4bcec --- /dev/null +++ b/cmd/common/st/transfer_pg_wals.go @@ -0,0 +1,22 @@ +package st + +import ( + "github.com/spf13/cobra" + "github.com/wal-g/wal-g/utility" +) + +const pgWALsShortDescription = "Moves all PostgreSQL WAL files from one storage to another" + +// pgWALsCmd represents the pg-wals command +var pgWALsCmd = &cobra.Command{ + Use: "pg-wals --source='source_storage' [--target='target_storage']", + Short: pgWALsShortDescription, + Args: cobra.NoArgs, + Run: func(_ *cobra.Command, _ []string) { + transferFiles(utility.WalPath) + }, +} + +func init() { + transferCmd.AddCommand(pgWALsCmd) +} diff --git a/cmd/common/st/transfer_test.go b/cmd/common/st/transfer_test.go index 2c7b50b9d..2c467eb75 100644 --- a/cmd/common/st/transfer_test.go +++ b/cmd/common/st/transfer_test.go @@ -1,13 +1,10 @@ package st import ( - "math" "testing" - - "github.com/stretchr/testify/assert" ) -func Test_validateFlags(t *testing.T) { +func Test_validateCommonFlags(t *testing.T) { tests := []struct { name string source, target string @@ -26,15 +23,9 @@ func Test_validateFlags(t *testing.T) { transferSourceStorage = tt.source targetStorage = tt.target transferConcurrency = tt.concurrency - if err := validateFlags(); (err != nil) != tt.wantErr { - t.Errorf("validateFlags() error = %v, wantErr %v", err, tt.wantErr) + if err := validateCommonFlags(); (err != nil) != tt.wantErr { + t.Errorf("validateCommonFlags() error = %v, wantErr %v", err, tt.wantErr) } }) } } - -func Test_adjustMaxFiles(t *testing.T) { - assert.Equal(t, math.MaxInt, adjustMaxFiles(-1)) - assert.Equal(t, 0, adjustMaxFiles(0)) - assert.Equal(t, 123, adjustMaxFiles(123)) -} diff --git a/cmd/gp/backup_push_segment.go b/cmd/gp/backup_push_segment.go index 7dcc1d8f8..885f466f2 100644 --- a/cmd/gp/backup_push_segment.go +++ b/cmd/gp/backup_push_segment.go @@ -28,6 +28,9 @@ var ( greenplum.SetSegmentStoragePrefix(contentID) + uploader, err := internal.ConfigureUploader() + tracelog.ErrorLogger.FatalOnError(err) + dataDirectory := args[0] if deltaFromName == "" { @@ -56,7 +59,7 @@ var ( tarBallComposerType := postgres.RegularComposer withoutFilesMetadata := false - arguments := postgres.NewBackupArguments(dataDirectory, utility.BaseBackupPath, + arguments := postgres.NewBackupArguments(uploader, dataDirectory, utility.BaseBackupPath, permanent, verifyPageChecksums, fullBackup, storeAllCorruptBlocks, tarBallComposerType, greenplum.NewSegDeltaBackupConfigurator(deltaBaseSelector), diff --git a/cmd/pg/backup_push.go b/cmd/pg/backup_push.go index 5794ff757..5fc0e8c9d 100644 --- a/cmd/pg/backup_push.go +++ b/cmd/pg/backup_push.go @@ -1,6 +1,7 @@ package pg import ( + "github.com/wal-g/wal-g/internal/multistorage" "github.com/wal-g/wal-g/utility" "github.com/wal-g/wal-g/internal/databases/postgres" @@ -44,6 +45,15 @@ var ( Run: func(cmd *cobra.Command, args []string) { internal.ConfigureLimiters() + baseUploader, err := internal.ConfigureUploader() + tracelog.ErrorLogger.FatalOnError(err) + + failover, err := internal.InitFailoverStorages() + tracelog.ErrorLogger.FatalOnError(err) + + uploader, err := multistorage.NewUploader(baseUploader, failover) + tracelog.ErrorLogger.FatalOnError(err) + var dataDirectory string if len(args) > 0 { @@ -88,7 +98,7 @@ var ( userData, err := internal.UnmarshalSentinelUserData(userDataRaw) tracelog.ErrorLogger.FatalfOnError("Failed to unmarshal the provided UserData: %s", err) - arguments := postgres.NewBackupArguments(dataDirectory, utility.BaseBackupPath, + arguments := postgres.NewBackupArguments(uploader, dataDirectory, utility.BaseBackupPath, permanent, verifyPageChecksums || viper.GetBool(internal.VerifyPageChecksumsSetting), fullBackup, storeAllCorruptBlocks || viper.GetBool(internal.StoreAllCorruptBlocksSetting), tarBallComposerType, postgres.NewRegularDeltaBackupConfigurator(deltaBaseSelector), diff --git a/docker-compose.yml b/docker-compose.yml index 4f92be90b..063bc0b36 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -101,10 +101,22 @@ services: && mkdir -p /export/gppartialtablebucket && mkdir -p /export/createrestorepointbucket && mkdir -p /export/storagetoolsbucket - && mkdir -p /export/sttransferbucket - && mkdir -p /export/sttransferfailoverbucket + && mkdir -p /export/sttransferfilesbucket + && mkdir -p /export/sttransferfilesfailoverbucket + && mkdir -p /export/transferbackupbucket && mkdir -p /export/walrestorebucket - && mkdir -p /export/daemonbucket + && mkdir -p /export/daemonbucket + && mkdir -p /export/backupmarkimpermanentbucket + && mkdir -p /export/backupmarkpermanentnoerrorbucket + && mkdir -p /export/backupmarkpermanentbucket + && mkdir -p /export/backupperftestbucket + && mkdir -p /export/catchupbucket + && mkdir -p /export/deletetargetdeltafindfullbucket + && mkdir -p /export/deletetargetdeltabucket + && mkdir -p /export/fullstreamedbucket + && mkdir -p /export/receivewalbucket + && mkdir -p /export/severaldeltabackupsbucket + && mkdir -p /export/walperftestbucket && /usr/bin/minio server /export' s3-another: @@ -141,6 +153,10 @@ services: context: . image: wal-g/pg_tests container_name: wal-g_pg_tests + depends_on: + - s3 + links: + - s3 pg_build_docker_prefix: build: @@ -484,6 +500,28 @@ services: links: - s3 + pg_copy_all_test: + build: + dockerfile: docker/pg_tests/Dockerfile_copy_all_test + context: . + image: wal-g/copy_all_test + container_name: wal-g_pg_copy_all_test + depends_on: + - s3 + links: + - s3 + + pg_copy_backup_test: + build: + dockerfile: docker/pg_tests/Dockerfile_copy_backup_test + context: . + image: wal-g/copy_backup_test + container_name: wal-g_pg_copy_backup_test + depends_on: + - s3 + links: + - s3 + pg_delete_end_to_end_test: build: dockerfile: docker/pg_tests/Dockerfile_delete_end_to_end_test @@ -600,6 +638,17 @@ services: links: - s3 + pg_transfer_backup_test: + build: + dockerfile: docker/pg_tests/Dockerfile_transfer_backup_test + context: . + image: wal-g/transfer_backup_test + container_name: wal-g_pg_transfer_backup_test + depends_on: + - s3 + links: + - s3 + pg_catchup_test: build: dockerfile: docker/pg_tests/Dockerfile_catchup_test diff --git a/docker/pg_tests/Dockerfile b/docker/pg_tests/Dockerfile index 9a2b38f4c..957a0e5ac 100644 --- a/docker/pg_tests/Dockerfile +++ b/docker/pg_tests/Dockerfile @@ -2,4 +2,6 @@ FROM wal-g/docker_prefix:latest COPY docker/pg_tests/scripts/ / +RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y netcat-openbsd && apt-get clean + CMD su postgres -c "/tmp/run_integration_tests.sh" \ No newline at end of file diff --git a/docker/pg_tests/Dockerfile_copy_all_test b/docker/pg_tests/Dockerfile_copy_all_test new file mode 100644 index 000000000..4f34c4773 --- /dev/null +++ b/docker/pg_tests/Dockerfile_copy_all_test @@ -0,0 +1,3 @@ +FROM wal-g/docker_prefix:latest + +CMD su postgres -c "/tmp/tests/copy_all_test.sh" diff --git a/docker/pg_tests/Dockerfile_copy_backup_test b/docker/pg_tests/Dockerfile_copy_backup_test new file mode 100644 index 000000000..b551c449a --- /dev/null +++ b/docker/pg_tests/Dockerfile_copy_backup_test @@ -0,0 +1,3 @@ +FROM wal-g/docker_prefix:latest + +CMD su postgres -c "/tmp/tests/copy_backup_test.sh" diff --git a/docker/pg_tests/Dockerfile_daemon_test b/docker/pg_tests/Dockerfile_daemon_test index da78c7ebe..263eb8cf9 100644 --- a/docker/pg_tests/Dockerfile_daemon_test +++ b/docker/pg_tests/Dockerfile_daemon_test @@ -2,4 +2,4 @@ FROM wal-g/docker_prefix:latest RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y netcat-openbsd && apt-get clean -CMD su - postgres /tmp/tests/daemon_test.sh +CMD su postgres /tmp/tests/daemon_test.sh diff --git a/docker/pg_tests/Dockerfile_transfer_backup_test b/docker/pg_tests/Dockerfile_transfer_backup_test new file mode 100644 index 000000000..ba6e561d4 --- /dev/null +++ b/docker/pg_tests/Dockerfile_transfer_backup_test @@ -0,0 +1,3 @@ +FROM wal-g/docker_prefix:latest + +CMD su postgres -c "/tmp/tests/transfer_backup_test.sh" \ No newline at end of file diff --git a/docker/pg_tests/scripts/configs/backup_mark_impermanent_test_config.json b/docker/pg_tests/scripts/configs/backup_mark_impermanent_test_config.json index aa26ae590..c0e9e62f6 100755 --- a/docker/pg_tests/scripts/configs/backup_mark_impermanent_test_config.json +++ b/docker/pg_tests/scripts/configs/backup_mark_impermanent_test_config.json @@ -1,3 +1,3 @@ "WALG_DELTA_MAX_STEPS": "0", -"WALE_S3_PREFIX": "s3://deletebeforepermanentfullbucket", +"WALE_S3_PREFIX": "s3://backupmarkimpermanentbucket", "WALG_USE_WAL_DELTA": "true" \ No newline at end of file diff --git a/docker/pg_tests/scripts/configs/backup_mark_permanent_no_error_test_config.json b/docker/pg_tests/scripts/configs/backup_mark_permanent_no_error_test_config.json index 415fd935d..74a416632 100755 --- a/docker/pg_tests/scripts/configs/backup_mark_permanent_no_error_test_config.json +++ b/docker/pg_tests/scripts/configs/backup_mark_permanent_no_error_test_config.json @@ -1,3 +1,3 @@ "WALG_DELTA_MAX_STEPS": "1000", -"WALE_S3_PREFIX": "s3://deletebeforepermanentfullbucket", +"WALE_S3_PREFIX": "s3://backupmarkpermanentnoerrorbucket", "WALG_USE_WAL_DELTA": "true" \ No newline at end of file diff --git a/docker/pg_tests/scripts/configs/backup_mark_permanent_test_config.json b/docker/pg_tests/scripts/configs/backup_mark_permanent_test_config.json index aa26ae590..906afd667 100755 --- a/docker/pg_tests/scripts/configs/backup_mark_permanent_test_config.json +++ b/docker/pg_tests/scripts/configs/backup_mark_permanent_test_config.json @@ -1,3 +1,3 @@ "WALG_DELTA_MAX_STEPS": "0", -"WALE_S3_PREFIX": "s3://deletebeforepermanentfullbucket", +"WALE_S3_PREFIX": "s3://backupmarkpermanentbucket", "WALG_USE_WAL_DELTA": "true" \ No newline at end of file diff --git a/docker/pg_tests/scripts/configs/backup_perftest_config.json b/docker/pg_tests/scripts/configs/backup_perftest_config.json index 83c53fb20..ab686c75b 100755 --- a/docker/pg_tests/scripts/configs/backup_perftest_config.json +++ b/docker/pg_tests/scripts/configs/backup_perftest_config.json @@ -1,5 +1,5 @@ "WALG_DELTA_MAX_STEPS": "0", -"WALE_S3_PREFIX": "s3://deletebeforepermanentfullbucket", +"WALE_S3_PREFIX": "s3://backupperftestbucket", "WALG_USE_WAL_DELTA": "true", "WALG_UPLOAD_CONCURRENCY": "1", "WALG_UPLOAD_QUEUE": "1", diff --git a/docker/pg_tests/scripts/configs/catchup_test_config.json b/docker/pg_tests/scripts/configs/catchup_test_config.json index 25e95f360..289286154 100755 --- a/docker/pg_tests/scripts/configs/catchup_test_config.json +++ b/docker/pg_tests/scripts/configs/catchup_test_config.json @@ -1,4 +1,4 @@ -"WALE_S3_PREFIX": "s3://deletebeforepermanentfullbucket", +"WALE_S3_PREFIX": "s3://catchupbucket", "WALG_USE_WAL_DELTA": "true", "WALG_UPLOAD_CONCURRENCY": "1", "WALG_UPLOAD_QUEUE": "1", diff --git a/docker/pg_tests/scripts/configs/copy_backup_to_test_config copy.json b/docker/pg_tests/scripts/configs/copy_backup_to_test_config.json similarity index 100% rename from docker/pg_tests/scripts/configs/copy_backup_to_test_config copy.json rename to docker/pg_tests/scripts/configs/copy_backup_to_test_config.json diff --git a/docker/pg_tests/scripts/configs/delete_target_delta_find_full_test_config.json b/docker/pg_tests/scripts/configs/delete_target_delta_find_full_test_config.json index e01c98b95..9dc0ddcdd 100755 --- a/docker/pg_tests/scripts/configs/delete_target_delta_find_full_test_config.json +++ b/docker/pg_tests/scripts/configs/delete_target_delta_find_full_test_config.json @@ -1,3 +1,3 @@ "WALG_DELTA_MAX_STEPS": "100", -"WALE_S3_PREFIX": "s3://deletetargetbucket", +"WALE_S3_PREFIX": "s3://deletetargetdeltafindfullbucket", "WALG_USE_WAL_DELTA": "true" \ No newline at end of file diff --git a/docker/pg_tests/scripts/configs/delete_target_delta_test_config.json b/docker/pg_tests/scripts/configs/delete_target_delta_test_config.json index e01c98b95..8ba4e7f51 100755 --- a/docker/pg_tests/scripts/configs/delete_target_delta_test_config.json +++ b/docker/pg_tests/scripts/configs/delete_target_delta_test_config.json @@ -1,3 +1,3 @@ "WALG_DELTA_MAX_STEPS": "100", -"WALE_S3_PREFIX": "s3://deletetargetbucket", +"WALE_S3_PREFIX": "s3://deletetargetdeltabucket", "WALG_USE_WAL_DELTA": "true" \ No newline at end of file diff --git a/docker/pg_tests/scripts/configs/full_backup_streamed_test_config.json b/docker/pg_tests/scripts/configs/full_backup_streamed_test_config.json index 95e101402..4c58ecbeb 100755 --- a/docker/pg_tests/scripts/configs/full_backup_streamed_test_config.json +++ b/docker/pg_tests/scripts/configs/full_backup_streamed_test_config.json @@ -1,4 +1,4 @@ -"WALE_S3_PREFIX": "s3://fullbucket", +"WALE_S3_PREFIX": "s3://fullstreamedbucket", "WALG_DELTA_MAX_STEPS": "6", "WALG_PGP_KEY_PATH": "/tmp/PGP_KEY", "WALG_SERIALIZER_TYPE": "json_streamed" \ No newline at end of file diff --git a/docker/pg_tests/scripts/configs/receive_wal_test_config.json b/docker/pg_tests/scripts/configs/receive_wal_test_config.json index 6747196c5..ff0bde544 100644 --- a/docker/pg_tests/scripts/configs/receive_wal_test_config.json +++ b/docker/pg_tests/scripts/configs/receive_wal_test_config.json @@ -1,4 +1,4 @@ -"WALE_S3_PREFIX": "s3://fullbucket", +"WALE_S3_PREFIX": "s3://receivewalbucket", "WALG_DELTA_MAX_STEPS": "6", "WALG_PGP_KEY_PATH": "/tmp/PGP_KEY", "WALG_RECEIVE_BULKMETADATA_PATH": "metadata_folder", diff --git a/docker/pg_tests/scripts/configs/several_delta_backups_test_config.json b/docker/pg_tests/scripts/configs/several_delta_backups_test_config.json index 043a26b36..0b20ccf07 100755 --- a/docker/pg_tests/scripts/configs/several_delta_backups_test_config.json +++ b/docker/pg_tests/scripts/configs/several_delta_backups_test_config.json @@ -1,3 +1,3 @@ -"WALE_S3_PREFIX": "s3://compressionbucket", +"WALE_S3_PREFIX": "s3://severaldeltabackupsbucket", "WALG_DELTA_MAX_STEPS": "6", "WALG_USE_WAL_DELTA": "true" \ No newline at end of file diff --git a/docker/pg_tests/scripts/configs/transfer_backup_test_config.json b/docker/pg_tests/scripts/configs/transfer_backup_test_config.json new file mode 100644 index 000000000..2c5885da8 --- /dev/null +++ b/docker/pg_tests/scripts/configs/transfer_backup_test_config.json @@ -0,0 +1,12 @@ +"WALE_S3_PREFIX": "s3://transferbackupbucket/default", +"WALG_DELTA_MAX_STEPS": "6", +"WALG_LOG_LEVEL": "DEVEL", +"WALG_PGP_KEY_PATH": "/tmp/PGP_KEY", +"WALG_FAILOVER_STORAGES": { + "failover": { + "AWS_SECRET_ACCESS_KEY": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", + "AWS_ACCESS_KEY_ID": "AKIAIOSFODNN7EXAMPLE", + "WALE_S3_PREFIX": "s3://transferbackupbucket/failover" + } +}, +"WALG_FAILOVER_STORAGES_CHECK_TIMEOUT": "5s" \ No newline at end of file diff --git a/docker/pg_tests/scripts/configs/transfer_backup_test_config_failover.json b/docker/pg_tests/scripts/configs/transfer_backup_test_config_failover.json new file mode 100644 index 000000000..e39c9424e --- /dev/null +++ b/docker/pg_tests/scripts/configs/transfer_backup_test_config_failover.json @@ -0,0 +1,12 @@ +"WALE_S3_PREFIX": "s3://nonexistentbucket", +"WALG_DELTA_MAX_STEPS": "6", +"WALG_LOG_LEVEL": "DEVEL", +"WALG_PGP_KEY_PATH": "/tmp/PGP_KEY", +"WALG_FAILOVER_STORAGES": { + "failover": { + "AWS_SECRET_ACCESS_KEY": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", + "AWS_ACCESS_KEY_ID": "AKIAIOSFODNN7EXAMPLE", + "WALE_S3_PREFIX": "s3://transferbackupbucket/failover" + } +}, +"WALG_FAILOVER_STORAGES_CHECK_TIMEOUT": "5s" diff --git a/docker/pg_tests/scripts/configs/wal_perftest_config.json b/docker/pg_tests/scripts/configs/wal_perftest_config.json index 83c53fb20..20315a65d 100755 --- a/docker/pg_tests/scripts/configs/wal_perftest_config.json +++ b/docker/pg_tests/scripts/configs/wal_perftest_config.json @@ -1,5 +1,5 @@ "WALG_DELTA_MAX_STEPS": "0", -"WALE_S3_PREFIX": "s3://deletebeforepermanentfullbucket", +"WALE_S3_PREFIX": "s3://walperftestbucket", "WALG_USE_WAL_DELTA": "true", "WALG_UPLOAD_CONCURRENCY": "1", "WALG_UPLOAD_QUEUE": "1", diff --git a/docker/pg_tests/scripts/tests/catchup_test.sh b/docker/pg_tests/scripts/tests/catchup_test.sh index 056f81318..874280a39 100755 --- a/docker/pg_tests/scripts/tests/catchup_test.sh +++ b/docker/pg_tests/scripts/tests/catchup_test.sh @@ -1,4 +1,4 @@ -#!/bin/sh +#!/bin/bash set -e -x PGDATA="/var/lib/postgresql/10/main" @@ -96,3 +96,4 @@ popd diff ${ALPHA_DUMP} ${BETA_DUMP} +/tmp/scripts/drop_pg.sh diff --git a/docker/pg_tests/scripts/tests/config_test.sh b/docker/pg_tests/scripts/tests/config_test.sh index 619b5fddc..2033a53e8 100755 --- a/docker/pg_tests/scripts/tests/config_test.sh +++ b/docker/pg_tests/scripts/tests/config_test.sh @@ -48,3 +48,5 @@ diff /tmp/dump1 /tmp/dump2 pkill -9 postgres rm -rf "${PGDATA}" rm ${TMP_CONFIG} + +/tmp/scripts/drop_pg.sh diff --git a/docker/pg_tests/scripts/tests/copy_all_test.sh b/docker/pg_tests/scripts/tests/copy_all_test.sh index 76048b397..16514d99a 100755 --- a/docker/pg_tests/scripts/tests/copy_all_test.sh +++ b/docker/pg_tests/scripts/tests/copy_all_test.sh @@ -14,16 +14,21 @@ TO_CONFIG_FILE="/tmp/configs/copy_all_to_test_config.json" TO_TMP_CONFIG="/tmp/configs/to_tmp_config.json" cat ${TO_CONFIG_FILE} > ${TO_TMP_CONFIG} echo "," >> ${TO_TMP_CONFIG} +cat ${COMMON_CONFIG} >> ${TO_TMP_CONFIG} /tmp/scripts/wrap_config_file.sh ${TO_TMP_CONFIG} /usr/lib/postgresql/10/bin/initdb "${PGDATA}" -echo "archive_mode = on"; echo "archive_command = '/usr/bin/timeout 600 /usr/bin/wal-g --config=${TMP_CONFIG} wal-push %p'"; echo "archive_timeout = 600" >> /var/lib/postgresql/10/main/postgresql.conf +echo "archive_mode = on" >> ${PGDATA}/postgresql.conf +echo "archive_command = '/usr/bin/timeout 600 wal-g --config=${TMP_CONFIG} wal-push %p'" >> ${PGDATA}/postgresql.conf +echo "archive_timeout = 600" >> ${PGDATA}/postgresql.conf /usr/lib/postgresql/10/bin/pg_ctl -D "${PGDATA}" -w start /tmp/scripts/wait_while_pg_not_ready.sh +wal-g --config=${TMP_CONFIG} st rm / --target=all || true + sleep 1 echo "$WALG_DELTA_MAX_STEPS" @@ -62,7 +67,7 @@ wal-g --config=${TO_TMP_CONFIG} backup-list | cut -f 1 -d " " | sort > /tmp/copi # and count lines in diff lines_count=$(diff /tmp/actual_backup_list /tmp/copied_backup_list | wc -l) -if [ lines_count > 0 ]; +if [ $lines_count -gt 0 ]; then echo "Copying all backups failed" exit 2 diff --git a/docker/pg_tests/scripts/tests/copy_backup_test.sh b/docker/pg_tests/scripts/tests/copy_backup_test.sh old mode 100644 new mode 100755 index 9cd0897d1..98877df47 --- a/docker/pg_tests/scripts/tests/copy_backup_test.sh +++ b/docker/pg_tests/scripts/tests/copy_backup_test.sh @@ -14,18 +14,21 @@ TO_CONFIG_FILE="/tmp/configs/copy_backup_to_test_config.json" TO_TMP_CONFIG="/tmp/configs/to_tmp_config.json" cat ${TO_CONFIG_FILE} > ${TO_TMP_CONFIG} echo "," >> ${TO_TMP_CONFIG} +cat ${COMMON_CONFIG} >> ${TO_TMP_CONFIG} /tmp/scripts/wrap_config_file.sh ${TO_TMP_CONFIG} /usr/lib/postgresql/10/bin/initdb ${PGDATA} -echo "archive_mode = on" >> /var/lib/postgresql/10/main/postgresql.conf -echo "archive_command = '/usr/bin/timeout 600 /usr/bin/wal-g --config=${TMP_CONFIG} wal-push %p'" >> /var/lib/postgresql/10/main/postgresql.conf -echo "archive_timeout = 600" >> /var/lib/postgresql/10/main/postgresql.conf +echo "archive_mode = on" >> ${PGDATA}/postgresql.conf +echo "archive_command = '/usr/bin/timeout 600 wal-g --config=${TMP_CONFIG} wal-push %p'" >> ${PGDATA}/postgresql.conf +echo "archive_timeout = 600" >> ${PGDATA}/postgresql.conf /usr/lib/postgresql/10/bin/pg_ctl -D ${PGDATA} -w start /tmp/scripts/wait_while_pg_not_ready.sh +wal-g --config=${TMP_CONFIG} st rm / --target=all || true + sleep 1 echo $WALG_DELTA_MAX_STEPS @@ -38,7 +41,7 @@ wal-g --config=${TMP_CONFIG} backup-list # copy backup with backup-name backup_name=$(wal-g --config=${TMP_CONFIG} backup-list | tail -n 1 | cut -f 1 -d " ") wal-g copy --backup-name=${backup_name} --from=${TMP_CONFIG} --to=${TO_TMP_CONFIG} --without-history -copied_backup_name=`wal-g --config=${TO_TMP_CONFIG} backup-list | tail -n 1 | cut -f 1 -d " "` +copied_backup_name=$(wal-g --config=${TO_TMP_CONFIG} backup-list | tail -n 1 | cut -f 1 -d " ") if [ $backup_name != $copied_backup_name ]; then @@ -70,12 +73,10 @@ wal-g copy --backup-name=${backup_name} --from=${TMP_CONFIG} --to=${TO_TMP_CONFI copied_backup_name=$(wal-g --config=${TO_TMP_CONFIG} backup-list | tail -n 1 | cut -f 1 -d " ") # check if backup copied -if [ "$last_backup_name" != "$copied_last_backup_name" ]; +if [ "$backup_name" != "$copied_backup_name" ]; then echo "Copying backup failed" exit 2 fi /tmp/scripts/drop_pg.sh - -echo "Copying backup test success!!!!!!" diff --git a/docker/pg_tests/scripts/tests/daemon_test.sh b/docker/pg_tests/scripts/tests/daemon_test.sh old mode 100644 new mode 100755 index 752514b57..7d401067b --- a/docker/pg_tests/scripts/tests/daemon_test.sh +++ b/docker/pg_tests/scripts/tests/daemon_test.sh @@ -50,3 +50,5 @@ else echo "Error in WAL-G response." exit 1 fi + +/tmp/scripts/drop_pg.sh diff --git a/docker/pg_tests/scripts/tests/full_backup_copy_composer_test.sh b/docker/pg_tests/scripts/tests/full_backup_copy_composer_test.sh index 91217a84d..a1643851d 100755 --- a/docker/pg_tests/scripts/tests/full_backup_copy_composer_test.sh +++ b/docker/pg_tests/scripts/tests/full_backup_copy_composer_test.sh @@ -9,4 +9,6 @@ cat ${COMMON_CONFIG} >> ${TMP_CONFIG} /tmp/scripts/wrap_config_file.sh ${TMP_CONFIG} . /tmp/tests/test_functions/test_copy_composer.sh -test_copy_composer ${TMP_CONFIG} \ No newline at end of file +test_copy_composer ${TMP_CONFIG} + +/tmp/scripts/drop_pg.sh diff --git a/docker/pg_tests/scripts/tests/full_backup_database_composer_test.sh b/docker/pg_tests/scripts/tests/full_backup_database_composer_test.sh index 763d20a3a..37b8aaefd 100755 --- a/docker/pg_tests/scripts/tests/full_backup_database_composer_test.sh +++ b/docker/pg_tests/scripts/tests/full_backup_database_composer_test.sh @@ -9,4 +9,6 @@ cat ${COMMON_CONFIG} >> ${TMP_CONFIG} /tmp/scripts/wrap_config_file.sh ${TMP_CONFIG} . /tmp/tests/test_functions/test_full_backup.sh -test_full_backup ${TMP_CONFIG} \ No newline at end of file +test_full_backup ${TMP_CONFIG} + +/tmp/scripts/drop_pg.sh diff --git a/docker/pg_tests/scripts/tests/full_backup_failover_storages_test.sh b/docker/pg_tests/scripts/tests/full_backup_failover_storages_test.sh index 8b62f228e..e5165bce3 100755 --- a/docker/pg_tests/scripts/tests/full_backup_failover_storages_test.sh +++ b/docker/pg_tests/scripts/tests/full_backup_failover_storages_test.sh @@ -25,13 +25,13 @@ echo "archive_timeout = 600" >> ${PGDATA}/postgresql.conf /tmp/scripts/wait_while_pg_not_ready.sh -wal-g --config=${TMP_CONFIG} delete everything FORCE --confirm +wal-g --config=${ARCHIVE_TMP_CONFIG} st rm / --target=all || true pgbench -i -s 5 postgres pg_dumpall -f /tmp/dump1 pgbench -c 2 -T 100000000 -S & sleep 1 -wal-g --config=${TMP_CONFIG} backup-push ${PGDATA} +wal-g --config=${ARCHIVE_TMP_CONFIG} backup-push ${PGDATA} /tmp/scripts/drop_pg.sh wal-g --config=${TMP_CONFIG} backup-fetch ${PGDATA} LATEST diff --git a/docker/pg_tests/scripts/tests/partial_restore_test.sh b/docker/pg_tests/scripts/tests/partial_restore_test.sh index 82fefd337..b3b80cf43 100755 --- a/docker/pg_tests/scripts/tests/partial_restore_test.sh +++ b/docker/pg_tests/scripts/tests/partial_restore_test.sh @@ -66,6 +66,7 @@ else fi if psql -t -c "SELECT COUNT(*) FROM tbl;" -d third -A 2>&1 | grep -q "is not a valid data directory"; then + /tmp/scripts/drop_pg.sh echo "Skipped database raises error, as it should be!!!!!" else echo "Skipped database responses unexpectedly" diff --git a/docker/pg_tests/scripts/tests/test_functions/remote_backup_and_restore_test.sh b/docker/pg_tests/scripts/tests/test_functions/remote_backup_and_restore_test.sh index 25f8306f7..d5d80f1ee 100755 --- a/docker/pg_tests/scripts/tests/test_functions/remote_backup_and_restore_test.sh +++ b/docker/pg_tests/scripts/tests/test_functions/remote_backup_and_restore_test.sh @@ -83,6 +83,9 @@ remote_backup_and_restore_test() { echo Comparing source and destination if diff "${TMPDIR}"/*dump.sql; then + /tmp/scripts/drop_pg.sh + rm ${TMP_CONFIG} + rm -rf ${PGTBS} echo OK else echo Ouch diff --git a/docker/pg_tests/scripts/tests/test_functions/test_full_backup.sh b/docker/pg_tests/scripts/tests/test_functions/test_full_backup.sh index cbc8a6064..b6cae99e2 100755 --- a/docker/pg_tests/scripts/tests/test_functions/test_full_backup.sh +++ b/docker/pg_tests/scripts/tests/test_functions/test_full_backup.sh @@ -63,7 +63,7 @@ test_full_backup() /tmp/scripts/wait_while_pg_not_ready.sh - wal-g --config=${TMP_CONFIG} delete everything FORCE --confirm + wal-g --config=${TMP_CONFIG} st rm / --target=all || true pgbench -i -s 5 postgres diff --git a/docker/pg_tests/scripts/tests/test_functions/test_receive_wal.sh b/docker/pg_tests/scripts/tests/test_functions/test_receive_wal.sh index 85ca52761..9a00f618d 100755 --- a/docker/pg_tests/scripts/tests/test_functions/test_receive_wal.sh +++ b/docker/pg_tests/scripts/tests/test_functions/test_receive_wal.sh @@ -27,6 +27,8 @@ test_receive_wal() # check verify results to end with 'OK' if echo "$VERIFY_RESULT" | grep -qP "\bOK$"; then + /tmp/scripts/drop_pg.sh + rm ${TMP_CONFIG} echo "WAL receive success!!!!!!" return 0 fi diff --git a/docker/pg_tests/scripts/tests/transfer_backup_test.sh b/docker/pg_tests/scripts/tests/transfer_backup_test.sh new file mode 100755 index 000000000..a9c9173b1 --- /dev/null +++ b/docker/pg_tests/scripts/tests/transfer_backup_test.sh @@ -0,0 +1,59 @@ +#!/bin/sh +set -e -x +CONFIG_FILE="/tmp/configs/transfer_backup_test_config.json" +FAILOVER_CONFIG_FILE="/tmp/configs/transfer_backup_test_config_failover.json" +COMMON_CONFIG="/tmp/configs/common_config.json" +TMP_CONFIG="/tmp/configs/tmp_config.json" +FAILOVER_TMP_CONFIG="/tmp/configs/failover_tmp_config.json" +cat ${CONFIG_FILE} > ${TMP_CONFIG} +echo "," >> ${TMP_CONFIG} +cat ${COMMON_CONFIG} >> ${TMP_CONFIG} +/tmp/scripts/wrap_config_file.sh ${TMP_CONFIG} + +cat ${FAILOVER_CONFIG_FILE} > ${FAILOVER_TMP_CONFIG} +echo "," >> ${FAILOVER_TMP_CONFIG} +cat ${COMMON_CONFIG} >> ${FAILOVER_TMP_CONFIG} +/tmp/scripts/wrap_config_file.sh ${FAILOVER_TMP_CONFIG} + +/usr/lib/postgresql/10/bin/initdb ${PGDATA} + +echo "archive_mode = on" >> ${PGDATA}/postgresql.conf +echo "archive_command = '/usr/bin/timeout 600 wal-g --config=${FAILOVER_TMP_CONFIG} wal-push %p'" >> ${PGDATA}/postgresql.conf +echo "archive_timeout = 600" >> ${PGDATA}/postgresql.conf + +/usr/lib/postgresql/10/bin/pg_ctl -D ${PGDATA} -w start + +/tmp/scripts/wait_while_pg_not_ready.sh + +wal-g --config=${TMP_CONFIG} st rm / --target=all || true + +pgbench -i -s 5 postgres +pg_dumpall -f /tmp/dump1 +pgbench -c 2 -T 100000000 -S & +sleep 1 +wal-g --config=${FAILOVER_TMP_CONFIG} backup-push ${PGDATA} +/tmp/scripts/drop_pg.sh + +wal-g --config=${TMP_CONFIG} st transfer backups --source=failover --target=default --overwrite --fail-fast +wal-g --config=${TMP_CONFIG} st transfer pg-wals --source=failover --target=default --overwrite --fail-fast + +wal-g --config=${TMP_CONFIG} backup-fetch ${PGDATA} LATEST + +echo "restore_command = 'echo \"WAL file restoration: %f, %p\"&& wal-g --config=${TMP_CONFIG} wal-fetch \"%f\" \"%p\"'" > ${PGDATA}/recovery.conf + +wal-g --config=${TMP_CONFIG} st ls -r --target failover +wal-g --config=${TMP_CONFIG} st ls -r --target default + +/usr/lib/postgresql/10/bin/pg_ctl -D ${PGDATA} -w start +/tmp/scripts/wait_while_pg_not_ready.sh +pg_dumpall -f /tmp/dump2 + +diff /tmp/dump1 /tmp/dump2 + +psql -f /tmp/scripts/amcheck.sql -v "ON_ERROR_STOP=1" postgres + +/tmp/scripts/drop_pg.sh +rm $TMP_CONFIG +rm $FAILOVER_TMP_CONFIG + +echo "Full backup success!!!!!!" diff --git a/docker/pg_tests/scripts/tests/wal_restore_test.sh b/docker/pg_tests/scripts/tests/wal_restore_test.sh index 02dfa9407..8b6f5f782 100755 --- a/docker/pg_tests/scripts/tests/wal_restore_test.sh +++ b/docker/pg_tests/scripts/tests/wal_restore_test.sh @@ -101,3 +101,5 @@ timeout 30 wal-g --config=${TMP_CONFIG} wal-restore ${PGDATA_ALPHA} ${PGDATA_BET sleep 10 /usr/lib/postgresql/10/bin/pg_rewind -D ${PGDATA_ALPHA} --source-pgdata=${PGDATA_BETA} + +/tmp/scripts/drop_pg.sh diff --git a/docker/st_tests/scripts/configs/transfer_test_config.json b/docker/st_tests/scripts/configs/transfer_files_test_config.json similarity index 67% rename from docker/st_tests/scripts/configs/transfer_test_config.json rename to docker/st_tests/scripts/configs/transfer_files_test_config.json index 5756cb942..5b43432ac 100644 --- a/docker/st_tests/scripts/configs/transfer_test_config.json +++ b/docker/st_tests/scripts/configs/transfer_files_test_config.json @@ -1,8 +1,8 @@ { - "WALE_S3_PREFIX": "s3://sttransferbucket", + "WALE_S3_PREFIX": "s3://sttransferfilesbucket", "WALG_FAILOVER_STORAGES": { "failover": { - "WALE_S3_PREFIX": "s3://sttransferfailoverbucket", + "WALE_S3_PREFIX": "s3://sttransferfilesfailoverbucket", "AWS_ACCESS_KEY_ID": "AKIAIOSFODNN7EXAMPLE", "AWS_SECRET_ACCESS_KEY": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" } diff --git a/docker/st_tests/scripts/tests/transfer_test.sh b/docker/st_tests/scripts/tests/transfer_files_test.sh similarity index 88% rename from docker/st_tests/scripts/tests/transfer_test.sh rename to docker/st_tests/scripts/tests/transfer_files_test.sh index bee8e5397..0cbad30ca 100755 --- a/docker/st_tests/scripts/tests/transfer_test.sh +++ b/docker/st_tests/scripts/tests/transfer_files_test.sh @@ -1,11 +1,11 @@ #!/bin/bash set -e -x -CONFIG="/tmp/configs/transfer_test_config.json" -TESTDATA="transfer" +CONFIG="/tmp/configs/transfer_files_test_config.json" +TESTDATA="transfer_files" echo "Upload 50 random files to the failover storage" -mkdir transfer +mkdir $TESTDATA for i in {1..50} do head -c 1M "$TESTDATA/$i" @@ -32,7 +32,7 @@ do done echo "Call the command to transfer files from the failover storage to the primary one" -wal-g --config=$CONFIG st transfer "a/" --source=failover --target=default +wal-g --config=$CONFIG st transfer files "a/" --source=failover --target=default echo "Check that all the target files are moved to the primary storage" wal-g --config=$CONFIG st ls -r "a/b/" diff --git a/docs/StorageTools.md b/docs/StorageTools.md index 4a72f85a9..935ad6119 100644 --- a/docs/StorageTools.md +++ b/docs/StorageTools.md @@ -39,12 +39,17 @@ Examples: ``wal-g st cat path/to/remote_file.json`` show `remote_file.json` ### ``rm`` -Remove the specified storage object. +Remove the specified storage object(s). +Any prefix may be specified as the argument. If there's a file with this path, it is removed. If not, but there's a directory with this path - all files from it and its subdirectories are removed. -Example: +Examples: ``wal-g st rm path/to/remote_file`` remove the file from storage. +``wal-g st rm path/to/remote_file_or_directory`` remove a file or all files in the directory. + +``wal-g st rm path/to/remote_directory/`` explicitly specify that the path points to a directory, not a file. + ### ``put`` Upload the specified file to the storage. By default, the command will try to apply the compression and encryption (if configured). @@ -58,13 +63,22 @@ Example: ``wal-g st put path/to/local_file path/to/remote_file`` upload the local file to the storage. ### `transfer` -Transfer all files from one configured storage to another. Is usually used to move files from a failover storage to the primary one when it becomes alive. +Transfer files from one configured storage to another. Is usually used to move files from a failover storage to the primary one when it becomes alive. -Args: +Subcommands: +1. `transfer files prefix` - moves arbitrary files without any special treatment. + + Argument `prefix` is path to a directory in both storages, where files should be moved to/from. Files from all subdirectories are also moved. -1. Path to the directory in both storages, where files should be moved to/from. Files from all subdirectories are also moved. +2. `transfer pg-wals` - moves PostgreSQL WAL files only (just an alias for `transfer files "wal_005/"`). -Flags: +3. `transfer backups [--max-backups=N]` - consistently moves backups. + + To prevent any problems with restoring from a partially uploaded/removed backup, the signal file `*_backup_stop_sentinel.json` is moved to the source storage last, and deleted from the target storage first. + + An additional flag is supported: `--max-backups` specifies max number of backups to move in this run. + +Flags (supported in every subcommand): 1. Add `-s (--source)` to specify the source storage name to take files from. To specify the primary storage, use `default`. This flag is required. @@ -86,7 +100,7 @@ Flags: 5. Add `-c (--concurrency)` to set the max number of concurrent workers that will move files. -6. Add `-m (--max)` to set the max number of files to move in a single command run. +6. Add `-m (--max-files)` to set the max number of files to move in a single command run. 7. Add `--appearance-checks` to set the max number of checks for files to appear in the target storage, which will be performed after moving the file and before deleting it. @@ -99,8 +113,10 @@ Flags: Examples: -``wal-g st transfer / --source='my_failover_ssh'`` +``wal-g st transfer pg-wals --source='my_failover_ssh'`` + +``wal-g st transfer files folder/single_file.json --source='default' --target='my_failover_ssh' --overwrite`` -``wal-g st transfer folder/single_file.json --source='default' --target='my_failover_ssh' --overwrite`` +``wal-g st transfer files basebackups_005/ --source='my_failover_s3' --target='default' --fail-fast -c=50 -m=10000 --appearance-checks=5 --appearance-checks-interval=1s`` -``wal-g st transfer basebackups_005/ --source='my_failover_s3' --target='default' --fail-fast -c=50 -m=10000 --appearance-checks=5 --appearance-checks-interval=1s`` +``wal-g st transfer backups --source='my_failover_s3' --target='default' --fail-fast -c=50 --max-files=10000 --max-backups=10 --appearance-checks=5 --appearance-checks-interval=1s`` diff --git a/internal/databases/greenplum/segment_backup_push_handler.go b/internal/databases/greenplum/segment_backup_push_handler.go index 6ed340c54..cb4c2022f 100644 --- a/internal/databases/greenplum/segment_backup_push_handler.go +++ b/internal/databases/greenplum/segment_backup_push_handler.go @@ -17,7 +17,7 @@ func NewSegBackupHandler(arguments postgres.BackupArguments) (*postgres.BackupHa return err } - maker, err := NewGpTarBallComposerMaker(relStorageMap, bh.Workers.Uploader, handler.CurBackupInfo.Name) + maker, err := NewGpTarBallComposerMaker(relStorageMap, bh.Arguments.Uploader, handler.CurBackupInfo.Name) if err != nil { return err } diff --git a/internal/databases/postgres/backup_push_handler.go b/internal/databases/postgres/backup_push_handler.go index 94fc47216..3c45e7531 100644 --- a/internal/databases/postgres/backup_push_handler.go +++ b/internal/databases/postgres/backup_push_handler.go @@ -48,6 +48,7 @@ func (err backupFromOtherBD) Error() string { // BackupArguments holds all arguments parsed from cmd to this handler class type BackupArguments struct { + Uploader internal.Uploader isPermanent bool verifyPageChecksums bool storeAllCorruptBlocks bool @@ -90,7 +91,6 @@ type PrevBackupInfo struct { // BackupWorkers holds the external objects that the handler uses to get the backup data / write the backup data type BackupWorkers struct { - Uploader internal.Uploader Bundle *Bundle QueryRunner *PgQueryRunner } @@ -112,10 +112,11 @@ type BackupHandler struct { } // NewBackupArguments creates a BackupArgument object to hold the arguments from the cmd -func NewBackupArguments(pgDataDirectory string, backupsFolder string, isPermanent bool, verifyPageChecksums bool, - isFullBackup bool, storeAllCorruptBlocks bool, tarBallComposerType TarBallComposerType, +func NewBackupArguments(uploader internal.Uploader, pgDataDirectory string, backupsFolder string, isPermanent bool, + verifyPageChecksums bool, isFullBackup bool, storeAllCorruptBlocks bool, tarBallComposerType TarBallComposerType, deltaConfigurator DeltaBackupConfigurator, userData interface{}, withoutFilesMetadata bool) BackupArguments { return BackupArguments{ + Uploader: uploader, pgDataDirectory: pgDataDirectory, backupsFolder: backupsFolder, isPermanent: isPermanent, @@ -133,11 +134,11 @@ func NewBackupArguments(pgDataDirectory string, backupsFolder string, isPermanen func (bh *BackupHandler) createAndPushBackup() { var err error - folder := bh.Workers.Uploader.Folder() + folder := bh.Arguments.Uploader.Folder() // TODO: AB: this subfolder switch look ugly. // I think typed storage folders could be better (i.e. interface BasebackupStorageFolder, WalStorageFolder etc) - bh.Workers.Uploader.ChangeDirectory(bh.Arguments.backupsFolder) - tracelog.DebugLogger.Printf("Uploading folder: %s", bh.Workers.Uploader.Folder()) + bh.Arguments.Uploader.ChangeDirectory(bh.Arguments.backupsFolder) + tracelog.DebugLogger.Printf("Uploading folder: %s", bh.Arguments.Uploader.Folder()) arguments := bh.Arguments crypter := internal.ConfigureCrypter() @@ -243,7 +244,7 @@ func (bh *BackupHandler) SetComposerInitFunc(initFunc func(handler *BackupHandle func configureTarBallComposer(bh *BackupHandler, tarBallComposerType TarBallComposerType) error { maker, err := NewTarBallComposerMaker(tarBallComposerType, bh.Workers.QueryRunner, - bh.Workers.Uploader, bh.CurBackupInfo.Name, + bh.Arguments.Uploader, bh.CurBackupInfo.Name, NewTarBallFilePackerOptions(bh.Arguments.verifyPageChecksums, bh.Arguments.storeAllCorruptBlocks), bh.Arguments.withoutFilesMetadata) if err != nil { @@ -257,7 +258,7 @@ func (bh *BackupHandler) uploadBackup() internal.TarFileSets { bundle := bh.Workers.Bundle // Start a new tar bundle, walk the pgDataDirectory and upload everything there. tracelog.InfoLogger.Println("Starting a new tar bundle") - err := bundle.StartQueue(internal.NewStorageTarBallMaker(bh.CurBackupInfo.Name, bh.Workers.Uploader)) + err := bundle.StartQueue(internal.NewStorageTarBallMaker(bh.CurBackupInfo.Name, bh.Arguments.Uploader)) tracelog.ErrorLogger.FatalOnError(err) err = bh.Arguments.composerInitFunc(bh) @@ -276,7 +277,7 @@ func (bh *BackupHandler) uploadBackup() internal.TarFileSets { tracelog.ErrorLogger.FatalOnError(err) tracelog.DebugLogger.Println("Uploading pg_control ...") - err = bundle.UploadPgControl(bh.Workers.Uploader.Compression().FileExtension()) + err = bundle.UploadPgControl(bh.Arguments.Uploader.Compression().FileExtension()) tracelog.ErrorLogger.FatalOnError(err) // Stops backup and write/upload postgres `backup_label` and `tablespace_map` Files @@ -285,7 +286,7 @@ func (bh *BackupHandler) uploadBackup() internal.TarFileSets { tracelog.ErrorLogger.FatalOnError(err) bh.CurBackupInfo.endLSN = finishLsn bh.CurBackupInfo.uncompressedSize = atomic.LoadInt64(bundle.TarBallQueue.AllTarballsSize) - bh.CurBackupInfo.compressedSize, err = bh.Workers.Uploader.UploadedDataSize() + bh.CurBackupInfo.compressedSize, err = bh.Arguments.Uploader.UploadedDataSize() bh.CurBackupInfo.dataCatalogSize = atomic.LoadInt64(bundle.DataCatalogSize) tracelog.ErrorLogger.FatalOnError(err) tarFileSets.AddFiles(labelFilesTarBallName, labelFilesList) @@ -298,8 +299,8 @@ func (bh *BackupHandler) uploadBackup() internal.TarFileSets { // Wait for all uploads to finish. tracelog.DebugLogger.Println("Waiting for all uploads to finish") - bh.Workers.Uploader.Finish() - if bh.Workers.Uploader.Failed() { + bh.Arguments.Uploader.Finish() + if bh.Arguments.Uploader.Failed() { tracelog.ErrorLogger.Fatalf("Uploading failed during '%s' backup.\n", bh.CurBackupInfo.Name) } if timelineChanged { @@ -347,7 +348,7 @@ func (bh *BackupHandler) handleBackupPushLocal() { } } - folder := bh.Workers.Uploader.Folder() + folder := bh.Arguments.Uploader.Folder() baseBackupFolder := folder.GetSubFolder(bh.Arguments.backupsFolder) tracelog.DebugLogger.Printf("Base backup folder: %s", baseBackupFolder) @@ -367,7 +368,7 @@ func (bh *BackupHandler) handleBackupPushLocal() { func (bh *BackupHandler) createAndPushRemoteBackup() { var err error - uploader := bh.Workers.Uploader + uploader := bh.Arguments.Uploader uploader.ChangeDirectory(utility.BaseBackupPath) tracelog.DebugLogger.Printf("Uploading folder: %s", uploader.Folder()) @@ -384,7 +385,7 @@ func (bh *BackupHandler) createAndPushRemoteBackup() { bh.CurBackupInfo.endLSN = LSN(baseBackup.EndLSN) bh.CurBackupInfo.uncompressedSize = baseBackup.UncompressedSize - bh.CurBackupInfo.compressedSize, err = bh.Workers.Uploader.UploadedDataSize() + bh.CurBackupInfo.compressedSize, err = bh.Arguments.Uploader.UploadedDataSize() tracelog.ErrorLogger.FatalOnError(err) sentinelDto := NewBackupSentinelDto(bh, baseBackup.GetTablespaceSpec()) filesMetadataDto := NewFilesMetadataDto(baseBackup.Files, tarFileSets) @@ -408,7 +409,7 @@ func (bh *BackupHandler) uploadMetadata(sentinelDto BackupSentinelDto, filesMeta if err != nil { tracelog.ErrorLogger.Fatalf("Failed to upload files metadata for backup %s: %v", curBackupName, err) } - err = internal.UploadSentinel(bh.Workers.Uploader, NewBackupSentinelDtoV2(sentinelDto, meta), bh.CurBackupInfo.Name) + err = internal.UploadSentinel(bh.Arguments.Uploader, NewBackupSentinelDtoV2(sentinelDto, meta), bh.CurBackupInfo.Name) if err != nil { tracelog.ErrorLogger.Fatalf("Failed to upload sentinel file for backup %s: %v", curBackupName, err) } @@ -439,10 +440,6 @@ func NewBackupHandler(arguments BackupArguments) (bh *BackupHandler, err error) // and version cannot be read easily using replication connection. // Retrieve both with this helper function which uses a temp connection to postgres. - uploader, err := internal.ConfigureUploader() - if err != nil { - return nil, err - } pgInfo, err := getPgServerInfo() if err != nil { return nil, err @@ -450,10 +447,7 @@ func NewBackupHandler(arguments BackupArguments) (bh *BackupHandler, err error) bh = &BackupHandler{ Arguments: arguments, - Workers: BackupWorkers{ - Uploader: uploader, - }, - PgInfo: pgInfo, + PgInfo: pgInfo, } return bh, nil @@ -486,7 +480,7 @@ func (bh *BackupHandler) runRemoteBackup() *StreamingBaseBackup { tracelog.ErrorLogger.FatalOnError(err) tracelog.InfoLogger.Println("Streaming remote backup") - err = baseBackup.Upload(bh.Workers.Uploader, bundleFiles) + err = baseBackup.Upload(bh.Arguments.Uploader, bundleFiles) tracelog.ErrorLogger.FatalOnError(err) tracelog.InfoLogger.Println("Finishing backup") @@ -550,7 +544,7 @@ func (bh *BackupHandler) uploadExtendedMetadata(meta ExtendedMetadataDto) (err e return internal.NewSentinelMarshallingError(metaFile, err) } tracelog.DebugLogger.Printf("Uploading metadata file (%s):\n%s", metaFile, dtoBody) - return bh.Workers.Uploader.Upload(metaFile, bytes.NewReader(dtoBody)) + return bh.Arguments.Uploader.Upload(metaFile, bytes.NewReader(dtoBody)) } func (bh *BackupHandler) uploadFilesMetadata(filesMetaDto FilesMetadataDto) (err error) { @@ -563,7 +557,7 @@ func (bh *BackupHandler) uploadFilesMetadata(filesMetaDto FilesMetadataDto) (err if err != nil { return err } - return bh.Workers.Uploader.Upload(getFilesMetadataPath(bh.CurBackupInfo.Name), bytes.NewReader(dtoBody)) + return bh.Arguments.Uploader.Upload(getFilesMetadataPath(bh.CurBackupInfo.Name), bytes.NewReader(dtoBody)) } func (bh *BackupHandler) checkPgVersionAndPgControl() { diff --git a/internal/databases/postgres/catchup_push_handler.go b/internal/databases/postgres/catchup_push_handler.go index bf716f3ed..a232c8abe 100644 --- a/internal/databases/postgres/catchup_push_handler.go +++ b/internal/databases/postgres/catchup_push_handler.go @@ -14,6 +14,9 @@ func extendExcludedFiles() { // HandleCatchupPush is invoked to perform a wal-g catchup-push func HandleCatchupPush(pgDataDirectory string, fromLSN LSN) { + uploader, err := internal.ConfigureUploader() + tracelog.ErrorLogger.FatalOnError(err) + pgDataDirectory = utility.ResolveSymlink(pgDataDirectory) fakePreviousBackupSentinelDto := BackupSentinelDto{ @@ -26,7 +29,7 @@ func HandleCatchupPush(pgDataDirectory string, fromLSN LSN) { tracelog.ErrorLogger.FatalfOnError("Failed to unmarshal the provided UserData: %s", err) backupArguments := NewBackupArguments( - pgDataDirectory, utility.CatchupPath, false, + uploader, pgDataDirectory, utility.CatchupPath, false, false, false, false, RegularComposer, NewCatchupDeltaBackupConfigurator(fakePreviousBackupSentinelDto), userData, false) diff --git a/internal/storagetools/delete_object_handler.go b/internal/storagetools/delete_object_handler.go deleted file mode 100644 index d79cead96..000000000 --- a/internal/storagetools/delete_object_handler.go +++ /dev/null @@ -1,24 +0,0 @@ -package storagetools - -import ( - "fmt" - - "github.com/wal-g/wal-g/pkg/storages/storage" -) - -func HandleDeleteObject(objectPath string, folder storage.Folder) error { - // some storages may not produce an error on deleting the non-existing object - exists, err := folder.Exists(objectPath) - if err != nil { - return fmt.Errorf("check object existence: %v", err) - } - - if !exists { - return fmt.Errorf("object %s does not exist", objectPath) - } - err = folder.DeleteObjects([]string{objectPath}) - if err != nil { - return fmt.Errorf("delete the specified object: %v", err) - } - return nil -} diff --git a/internal/storagetools/remove_handler.go b/internal/storagetools/remove_handler.go new file mode 100644 index 000000000..1a29a6fd5 --- /dev/null +++ b/internal/storagetools/remove_handler.go @@ -0,0 +1,29 @@ +package storagetools + +import ( + "fmt" + + "github.com/wal-g/wal-g/pkg/storages/storage" +) + +func HandleRemove(prefix string, folder storage.Folder) error { + objects, err := storage.ListFolderRecursivelyWithPrefix(folder, prefix) + if err != nil { + return fmt.Errorf("list files by prefix: %w", err) + } + + if len(objects) == 0 { + return fmt.Errorf("object or folder %q does not exist", prefix) + } + + paths := make([]string, len(objects)) + for i, obj := range objects { + paths[i] = obj.GetName() + } + + err = folder.DeleteObjects(paths) + if err != nil { + return fmt.Errorf("delete objects by the prefix: %v", err) + } + return nil +} diff --git a/internal/storagetools/remove_handler_test.go b/internal/storagetools/remove_handler_test.go new file mode 100644 index 000000000..dfb1f3d19 --- /dev/null +++ b/internal/storagetools/remove_handler_test.go @@ -0,0 +1,75 @@ +package storagetools + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/wal-g/wal-g/pkg/storages/memory" +) + +func TestHandleRemove(t *testing.T) { + t.Run("throw err when there is no files at prefix", func(t *testing.T) { + emptyFolder := memory.NewFolder("test/", memory.NewStorage()) + err := HandleRemove("a/b/c/nonexistent", emptyFolder) + require.Error(t, err) + assert.Contains(t, err.Error(), "does not exist") + }) + + t.Run("remove single file", func(t *testing.T) { + folder := memory.NewFolder("test/", memory.NewStorage()) + + targetFile := "a/b/c/target" + targetFolder := []string{ + "a/b/c/target/1", + "a/b/c/target/1/2", + "a/b/c/target/1/2/3", + } + for _, f := range append(targetFolder, targetFile) { + err := folder.PutObject(f, bytes.NewBufferString("123")) + require.NoError(t, err) + } + + err := HandleRemove("a/b/c/target", folder) + require.NoError(t, err) + + exists, err := folder.Exists(targetFile) + require.NoError(t, err) + assert.False(t, exists) + + for _, f := range targetFolder { + exists, err = folder.Exists(f) + require.NoError(t, err) + assert.True(t, exists) + } + }) + + t.Run("remove all files in folder", func(t *testing.T) { + folder := memory.NewFolder("test/", memory.NewStorage()) + + targetFile := "a/b/c/target" + targetFolder := []string{ + "a/b/c/target/1", + "a/b/c/target/1/2", + "a/b/c/target/1/2/3", + } + for _, f := range append(targetFolder, targetFile) { + err := folder.PutObject(f, bytes.NewBufferString("123")) + require.NoError(t, err) + } + + err := HandleRemove("a/b/c/target/", folder) + require.NoError(t, err) + + exists, err := folder.Exists(targetFile) + require.NoError(t, err) + assert.True(t, exists) + + for _, f := range targetFolder { + exists, err = folder.Exists(f) + require.NoError(t, err) + assert.False(t, exists) + } + }) +} diff --git a/internal/storagetools/transfer/backup_file_lister.go b/internal/storagetools/transfer/backup_file_lister.go new file mode 100644 index 000000000..16001d656 --- /dev/null +++ b/internal/storagetools/transfer/backup_file_lister.go @@ -0,0 +1,148 @@ +package transfer + +import ( + "path" + "strings" + + "github.com/wal-g/tracelog" + "github.com/wal-g/wal-g/pkg/storages/storage" + "github.com/wal-g/wal-g/utility" +) + +type BackupFileLister struct { + Name string + Overwrite bool + MaxFiles int + MaxBackups int +} + +const prefix = utility.BaseBackupPath + +func NewSingleBackupFileLister(name string, overwrite bool, maxFiles int) *BackupFileLister { + return &BackupFileLister{ + Name: name, + Overwrite: overwrite, + MaxFiles: maxFiles, + MaxBackups: 1, + } +} + +func NewAllBackupsFileLister(overwrite bool, maxFiles, maxBackups int) *BackupFileLister { + return &BackupFileLister{ + Name: "", + Overwrite: overwrite, + MaxFiles: maxFiles, + MaxBackups: maxBackups, + } +} + +func (l *BackupFileLister) ListFilesToMove(source, target storage.Folder) (files []FilesGroup, num int, err error) { + missingFiles, err := listMissingFiles(source, target, prefix, l.Overwrite) + if err != nil { + return nil, 0, err + } + backups := findBackups(missingFiles, l.Name) + fileGroups, filesNum := groupAndLimitBackupFiles(backups, l.MaxFiles, l.MaxBackups) + return fileGroups, filesNum, nil +} + +type backupFiles struct { + sentinel storage.Object + backupData []storage.Object +} + +func findBackups(files map[string]storage.Object, targetName string) map[string]backupFiles { + backups := map[string]backupFiles{} + for filePath, file := range files { + category, backupName := categoriseFile(filePath) + if category == fileCategoryOther { + continue + } + if targetName != "" && targetName != backupName { + continue + } + backup := backups[backupName] + switch category { + case fileCategorySentinel: + backup.sentinel = file + case fileCategoryBackupData: + backup.backupData = append(backup.backupData, file) + } + backups[backupName] = backup + } + tracelog.InfoLogger.Printf("Backups missing in the target storage: %d", len(backups)) + return backups +} + +type fileCategory int + +const ( + fileCategoryOther fileCategory = iota + fileCategorySentinel + fileCategoryBackupData +) + +func categoriseFile(filePath string) (category fileCategory, backupName string) { + dir, fileName := path.Split(strings.TrimPrefix(filePath, prefix)) + if dir == "" && + strings.HasPrefix(fileName, utility.BackupNamePrefix) && + strings.HasSuffix(fileName, utility.SentinelSuffix) { + backupName = strings.TrimSuffix(fileName, utility.SentinelSuffix) + return fileCategorySentinel, backupName + } + if strings.HasPrefix(dir, utility.BackupNamePrefix) { + firstSlash := strings.Index(dir, "/") + return fileCategoryBackupData, dir[:firstSlash] + } + return fileCategoryOther, "" +} + +func groupAndLimitBackupFiles(backups map[string]backupFiles, maxFiles, maxBackups int) (files []FilesGroup, num int) { + filesCount := 0 + fileGroups := make([]FilesGroup, 0, len(backups)) + for name, backup := range backups { + if backup.sentinel == nil { + tracelog.InfoLogger.Printf("Skip incomplete backup without sentinel file: %s", name) + continue + } + if len(backup.backupData) == 0 { + tracelog.WarningLogger.Printf("Backup doesn't have any data: %s", name) + continue + } + + group := linkGroup(backup) + + if filesCount+len(group) > maxFiles { + break + } + filesCount += len(group) + + fileGroups = append(fileGroups, group) + if len(fileGroups) >= maxBackups { + break + } + } + tracelog.InfoLogger.Printf("Backups will be transferred: %d", len(fileGroups)) + tracelog.InfoLogger.Printf("Files will be transferred: %d", filesCount) + return fileGroups, filesCount +} + +// linkGroup makes a linked group of files from the backup. The sentinel file is linked to data files so that it will be +// copied to the target storage only after them. In turn, data files are linked to the sentinel file so that they will +// be deleted from the source storage only after it. This is needed to move backups consistently and atomically. +func linkGroup(backup backupFiles) FilesGroup { + sentinelFile := FileToMove{ + path: backup.sentinel.GetName(), + } + filesToMove := make([]FileToMove, 0, len(backup.backupData)) + for _, obj := range backup.backupData { + dataFile := FileToMove{ + path: obj.GetName(), + deleteAfter: []string{sentinelFile.path}, + } + filesToMove = append(filesToMove, dataFile) + sentinelFile.copyAfter = append(sentinelFile.copyAfter, dataFile.path) + } + + return append(filesToMove, sentinelFile) +} diff --git a/internal/storagetools/transfer/backup_file_lister_test.go b/internal/storagetools/transfer/backup_file_lister_test.go new file mode 100644 index 000000000..5d7c9e75d --- /dev/null +++ b/internal/storagetools/transfer/backup_file_lister_test.go @@ -0,0 +1,239 @@ +package transfer + +import ( + "bytes" + "fmt" + "sort" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/wal-g/wal-g/pkg/storages/memory" + "github.com/wal-g/wal-g/pkg/storages/storage" +) + +func TestBackupFileLister_ListFilesToMove(t *testing.T) { + defaultLister := func() (lister *BackupFileLister, source, target storage.Folder) { + lister = NewAllBackupsFileLister(false, 100, 100) + source = memory.NewFolder("source/", memory.NewStorage()) + target = memory.NewFolder("target/", memory.NewStorage()) + return + } + + backupPrefix := func(i int) string { + return fmt.Sprintf("basebackups_005/base_00%d", i+1) + } + + t.Run("list backup files in separate groups", func(t *testing.T) { + l, source, target := defaultLister() + + for i := 0; i < 2; i++ { + _ = source.PutObject(backupPrefix(i)+"/a", &bytes.Buffer{}) + _ = source.PutObject(backupPrefix(i)+"/b/c", &bytes.Buffer{}) + _ = source.PutObject(backupPrefix(i)+"_backup_stop_sentinel.json", &bytes.Buffer{}) + } + _ = source.PutObject("basebackups_005/non_backup_file", &bytes.Buffer{}) + + groups, num, err := l.ListFilesToMove(source, target) + assert.NoError(t, err) + + require.Len(t, groups, 2) + assert.Equal(t, 6, num) + sortGroups(groups) + for i, group := range groups { + sortFiles(group) + assert.Equal(t, + FilesGroup{ + FileToMove{ + path: backupPrefix(i) + "/a", + deleteAfter: []string{backupPrefix(i) + "_backup_stop_sentinel.json"}, + }, + FileToMove{ + path: backupPrefix(i) + "/b/c", + deleteAfter: []string{backupPrefix(i) + "_backup_stop_sentinel.json"}, + }, + FileToMove{ + path: backupPrefix(i) + "_backup_stop_sentinel.json", + copyAfter: []string{backupPrefix(i) + "/a", backupPrefix(i) + "/b/c"}, + }, + }, + groups[i], + ) + } + }) + + t.Run("exclude already existing files", func(t *testing.T) { + l, source, target := defaultLister() + + _ = source.PutObject("basebackups_005/base_001/a", &bytes.Buffer{}) + _ = source.PutObject("basebackups_005/base_001/b", &bytes.Buffer{}) + _ = source.PutObject("basebackups_005/base_001_backup_stop_sentinel.json", &bytes.Buffer{}) + + _ = target.PutObject("basebackups_005/base_001/b", &bytes.Buffer{}) + + groups, num, err := l.ListFilesToMove(source, target) + require.NoError(t, err) + assert.Equal(t, 2, num) + + require.Len(t, groups, 1) + sortFiles(groups[0]) + assert.Equal(t, + FilesGroup{ + FileToMove{ + path: "basebackups_005/base_001/a", + deleteAfter: []string{"basebackups_005/base_001_backup_stop_sentinel.json"}, + }, + FileToMove{ + path: "basebackups_005/base_001_backup_stop_sentinel.json", + copyAfter: []string{"basebackups_005/base_001/a"}, + }, + }, + groups[0], + ) + }) + + t.Run("include existing files when overwrite allowed", func(t *testing.T) { + l, source, target := defaultLister() + l.Overwrite = true + + _ = source.PutObject("basebackups_005/base_001/a", &bytes.Buffer{}) + _ = source.PutObject("basebackups_005/base_001/b", &bytes.Buffer{}) + _ = source.PutObject("basebackups_005/base_001_backup_stop_sentinel.json", &bytes.Buffer{}) + + _ = target.PutObject("basebackups_005/base_001/b", &bytes.Buffer{}) + + groups, _, err := l.ListFilesToMove(source, target) + require.NoError(t, err) + + require.Len(t, groups, 1) + require.Len(t, groups[0], 3) + }) + + t.Run("list single backup if name is specified", func(t *testing.T) { + l, source, target := defaultLister() + l = NewSingleBackupFileLister("base_002", l.Overwrite, l.MaxFiles) + + _ = source.PutObject("basebackups_005/base_001/a", &bytes.Buffer{}) + _ = source.PutObject("basebackups_005/base_001_backup_stop_sentinel.json", &bytes.Buffer{}) + + _ = source.PutObject("basebackups_005/base_002/a", &bytes.Buffer{}) + _ = source.PutObject("basebackups_005/base_002_backup_stop_sentinel.json", &bytes.Buffer{}) + + _ = source.PutObject("basebackups_005/base_003/a", &bytes.Buffer{}) + _ = source.PutObject("basebackups_005/base_003_backup_stop_sentinel.json", &bytes.Buffer{}) + + groups, _, err := l.ListFilesToMove(source, target) + require.NoError(t, err) + + require.Len(t, groups, 1) + require.Len(t, groups[0], 2) + require.Contains(t, groups[0][0].path, "basebackups_005/base_002") + }) + + t.Run("ignore max backups if name is specified", func(t *testing.T) { + l, source, target := defaultLister() + l.Name = "base_001" + l.MaxBackups = 0 + + _ = source.PutObject("basebackups_005/base_001/a", &bytes.Buffer{}) + _ = source.PutObject("basebackups_005/base_001_backup_stop_sentinel.json", &bytes.Buffer{}) + + groups, _, err := l.ListFilesToMove(source, target) + require.NoError(t, err) + + require.Len(t, groups, 1) + }) + + t.Run("skip incomplete backups", func(t *testing.T) { + l, source, target := defaultLister() + + _ = source.PutObject("basebackups_005/base_001/a", &bytes.Buffer{}) + + _ = source.PutObject("basebackups_005/base_002/a", &bytes.Buffer{}) + _ = source.PutObject("basebackups_005/base_002_backup_stop_sentinel.json", &bytes.Buffer{}) + + groups, _, err := l.ListFilesToMove(source, target) + require.NoError(t, err) + + require.Len(t, groups, 1) + require.Len(t, groups[0], 2) + require.Contains(t, groups[0][0].path, "basebackups_005/base_002") + }) + + t.Run("skip empty backups", func(t *testing.T) { + l, source, target := defaultLister() + + _ = source.PutObject("basebackups_005/base_001_backup_stop_sentinel.json", &bytes.Buffer{}) + + _ = source.PutObject("basebackups_005/base_002/a", &bytes.Buffer{}) + _ = source.PutObject("basebackups_005/base_002_backup_stop_sentinel.json", &bytes.Buffer{}) + + groups, _, err := l.ListFilesToMove(source, target) + require.NoError(t, err) + + require.Len(t, groups, 1) + require.Len(t, groups[0], 2) + require.Contains(t, groups[0][0].path, "basebackups_005/base_002") + }) + + t.Run("limit number of files", func(t *testing.T) { + l, source, target := defaultLister() + l.MaxFiles = 3 + + _ = source.PutObject("basebackups_005/base_001/a", &bytes.Buffer{}) + _ = source.PutObject("basebackups_005/base_001_backup_stop_sentinel.json", &bytes.Buffer{}) + + _ = source.PutObject("basebackups_005/base_002/a", &bytes.Buffer{}) + _ = source.PutObject("basebackups_005/base_002_backup_stop_sentinel.json", &bytes.Buffer{}) + + groups, num, err := l.ListFilesToMove(source, target) + require.NoError(t, err) + assert.Equal(t, 2, num) + + require.Len(t, groups, 1) + require.Len(t, groups[0], 2) + }) + + t.Run("list no backups if single backup has more than max files", func(t *testing.T) { + l, source, target := defaultLister() + l.MaxFiles = 1 + + _ = source.PutObject("basebackups_005/base_002/a", &bytes.Buffer{}) + _ = source.PutObject("basebackups_005/base_002_backup_stop_sentinel.json", &bytes.Buffer{}) + + groups, num, err := l.ListFilesToMove(source, target) + require.NoError(t, err) + assert.Equal(t, 0, num) + + require.Len(t, groups, 0) + }) + + t.Run("limit number of backups", func(t *testing.T) { + l, source, target := defaultLister() + l.MaxBackups = 1 + + _ = source.PutObject("basebackups_005/base_001/a", &bytes.Buffer{}) + _ = source.PutObject("basebackups_005/base_001_backup_stop_sentinel.json", &bytes.Buffer{}) + + _ = source.PutObject("basebackups_005/base_002/a", &bytes.Buffer{}) + _ = source.PutObject("basebackups_005/base_002_backup_stop_sentinel.json", &bytes.Buffer{}) + + groups, num, err := l.ListFilesToMove(source, target) + require.NoError(t, err) + assert.Equal(t, 2, num) + + require.Len(t, groups, 1) + }) +} + +func sortGroups(groups []FilesGroup) { + sort.Slice(groups, func(i, j int) bool { return groups[i][0].path < groups[j][0].path }) +} + +func sortFiles(group FilesGroup) { + sort.Slice(group, func(i, j int) bool { return group[i].path < group[j].path }) + for _, f := range group { + sort.Strings(f.copyAfter) + sort.Strings(f.deleteAfter) + } +} diff --git a/internal/storagetools/transfer/file_lister.go b/internal/storagetools/transfer/file_lister.go new file mode 100644 index 000000000..d03a6c87b --- /dev/null +++ b/internal/storagetools/transfer/file_lister.go @@ -0,0 +1,103 @@ +package transfer + +import ( + "fmt" + + "github.com/wal-g/tracelog" + "github.com/wal-g/wal-g/pkg/storages/storage" + "github.com/wal-g/wal-g/utility" +) + +type FileLister interface { + ListFilesToMove(sourceStorage, targetStorage storage.Folder) (files []FilesGroup, num int, err error) +} + +// FilesGroup is an ordered set of files that must be transferred atomically +type FilesGroup []FileToMove + +type FileToMove struct { + path string + copyAfter []string + deleteAfter []string +} + +type RegularFileLister struct { + Prefix string + Overwrite bool + MaxFiles int +} + +func NewRegularFileLister(prefix string, overwrite bool, maxFiles int) *RegularFileLister { + return &RegularFileLister{ + Prefix: prefix, + Overwrite: overwrite, + MaxFiles: maxFiles, + } +} + +func (l *RegularFileLister) ListFilesToMove(source, target storage.Folder) (files []FilesGroup, num int, err error) { + missingFiles, err := listMissingFiles(source, target, l.Prefix, l.Overwrite) + if err != nil { + return nil, 0, err + } + limitedFiles := limitFiles(missingFiles, l.MaxFiles) + return limitedFiles, len(limitedFiles), nil +} + +func listMissingFiles(source, target storage.Folder, prefix string, overwrite bool) (map[string]storage.Object, error) { + targetFiles, err := storage.ListFolderRecursivelyWithPrefix(target, prefix) + if err != nil { + return nil, fmt.Errorf("list files in the target storage: %w", err) + } + sourceFiles, err := storage.ListFolderRecursivelyWithPrefix(source, prefix) + if err != nil { + return nil, fmt.Errorf("list files in the source storage: %w", err) + } + tracelog.InfoLogger.Printf("Total files in the source storage: %d", len(sourceFiles)) + + missingFiles := make(map[string]storage.Object, len(sourceFiles)) + for _, sourceFile := range sourceFiles { + missingFiles[sourceFile.GetName()] = sourceFile + } + for _, targetFile := range targetFiles { + sourceFile, presentInBothStorages := missingFiles[targetFile.GetName()] + if !presentInBothStorages { + continue + } + if overwrite { + logSizesDifference(sourceFile, targetFile) + } else { + delete(missingFiles, targetFile.GetName()) + } + } + tracelog.InfoLogger.Printf("Files missing in the target storage: %d", len(missingFiles)) + return missingFiles, nil +} + +func logSizesDifference(sourceFile, targetFile storage.Object) { + if sourceFile.GetSize() != targetFile.GetSize() { + tracelog.WarningLogger.Printf( + "File present in both storages and its size is different: %q (source %d bytes VS target %d bytes)", + targetFile.GetName(), + sourceFile.GetSize(), + targetFile.GetSize(), + ) + } +} + +func limitFiles(files map[string]storage.Object, max int) []FilesGroup { + count := 0 + fileGroups := make([]FilesGroup, 0, utility.Min(max, len(files))) + for _, file := range files { + if count >= max { + break + } + singleFileGroup := FilesGroup{ + FileToMove{path: file.GetName()}, + } + fileGroups = append(fileGroups, singleFileGroup) + count++ + } + tracelog.InfoLogger.Printf("Files will be transferred: %d", len(fileGroups)) + return fileGroups +} diff --git a/internal/storagetools/transfer/file_lister_test.go b/internal/storagetools/transfer/file_lister_test.go new file mode 100644 index 000000000..bab4a7ff0 --- /dev/null +++ b/internal/storagetools/transfer/file_lister_test.go @@ -0,0 +1,97 @@ +package transfer + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/wal-g/wal-g/pkg/storages/memory" + "github.com/wal-g/wal-g/pkg/storages/storage" +) + +func TestRegularFileLister_ListFilesToMove(t *testing.T) { + defaultLister := func() (lister *RegularFileLister, source, target storage.Folder) { + lister = NewRegularFileLister("/", false, 100) + source = memory.NewFolder("source/", memory.NewStorage()) + target = memory.NewFolder("target/", memory.NewStorage()) + return + } + + t.Run("list files from parent dir only", func(t *testing.T) { + l, source, target := defaultLister() + l.Prefix = "1/" + + _ = source.PutObject("1/a", &bytes.Buffer{}) + _ = source.PutObject("1/b", &bytes.Buffer{}) + _ = source.PutObject("2/a", &bytes.Buffer{}) + + groups, num, err := l.ListFilesToMove(source, target) + assert.NoError(t, err) + + require.Len(t, groups, 2) + assert.Equal(t, 2, num) + sortGroups(groups) + assert.Equal(t, FilesGroup{FileToMove{path: "1/a"}}, groups[0]) + assert.Equal(t, FilesGroup{FileToMove{path: "1/b"}}, groups[1]) + }) + + t.Run("exclude already existing files", func(t *testing.T) { + l, source, target := defaultLister() + + _ = source.PutObject("1", &bytes.Buffer{}) + _ = source.PutObject("2", &bytes.Buffer{}) + + _ = target.PutObject("1", &bytes.Buffer{}) + + groups, _, err := l.ListFilesToMove(source, target) + assert.NoError(t, err) + + require.Len(t, groups, 1) + assert.Equal(t, FilesGroup{FileToMove{path: "2"}}, groups[0]) + }) + + t.Run("include existing files when overwrite allowed", func(t *testing.T) { + l, source, target := defaultLister() + l.Overwrite = true + + _ = source.PutObject("1", &bytes.Buffer{}) + _ = source.PutObject("2", &bytes.Buffer{}) + + _ = target.PutObject("1", &bytes.Buffer{}) + + groups, _, err := l.ListFilesToMove(source, target) + assert.NoError(t, err) + + require.Len(t, groups, 2) + }) + + t.Run("dont include nonexistent files even when overwrite allowed", func(t *testing.T) { + l, source, target := defaultLister() + l.Overwrite = true + + _ = source.PutObject("2", &bytes.Buffer{}) + + _ = target.PutObject("1", &bytes.Buffer{}) + + groups, _, err := l.ListFilesToMove(source, target) + assert.NoError(t, err) + + require.Len(t, groups, 1) + require.Len(t, groups[0], 1) + assert.Equal(t, "2", groups[0][0].path) + }) + + t.Run("limit number of files", func(t *testing.T) { + l, source, target := defaultLister() + l.MaxFiles = 1 + + _ = source.PutObject("1", &bytes.Buffer{}) + _ = source.PutObject("2", &bytes.Buffer{}) + + groups, _, err := l.ListFilesToMove(source, target) + assert.NoError(t, err) + + require.Len(t, groups, 1) + }) +} diff --git a/internal/storagetools/transfer/handler.go b/internal/storagetools/transfer/handler.go new file mode 100644 index 000000000..d52b5ea2d --- /dev/null +++ b/internal/storagetools/transfer/handler.go @@ -0,0 +1,368 @@ +package transfer + +import ( + "context" + "fmt" + "os" + "os/signal" + "sync" + "sync/atomic" + "time" + + "github.com/wal-g/tracelog" + "github.com/wal-g/wal-g/internal/multistorage" + "github.com/wal-g/wal-g/pkg/storages/storage" + "github.com/wal-g/wal-g/utility" +) + +type Handler struct { + source storage.Folder + target storage.Folder + fileLister FileLister + cfg *HandlerConfig + fileStatuses *sync.Map + filesLeft int32 + jobRequirements map[jobKey][]jobRequirement +} + +type HandlerConfig struct { + FailOnFirstErr bool + Concurrency int + AppearanceChecks uint + AppearanceChecksInterval time.Duration +} + +func NewHandler( + sourceStorage, targetStorage string, + fileLister FileLister, + cfg *HandlerConfig, +) (*Handler, error) { + sourceFolder, err := multistorage.ConfigureStorageFolder(sourceStorage) + if err != nil { + return nil, fmt.Errorf("configure source storage folder: %w", err) + } + targetFolder, err := multistorage.ConfigureStorageFolder(targetStorage) + if err != nil { + return nil, fmt.Errorf("configure target storage folder: %w", err) + } + + return &Handler{ + source: sourceFolder, + target: targetFolder, + fileLister: fileLister, + cfg: cfg, + fileStatuses: new(sync.Map), + jobRequirements: map[jobKey][]jobRequirement{}, + }, nil +} + +func (h *Handler) Handle() error { + files, filesNum, err := h.fileLister.ListFilesToMove(h.source, h.target) + if err != nil { + return err + } + + workersNum := utility.Min(h.cfg.Concurrency, len(files)) + return h.transferConcurrently(workersNum, files, filesNum) +} + +type transferJob struct { + key jobKey + prevCheck time.Time + performedChecks uint +} + +type jobKey struct { + filePath string + jobType jobType +} + +type jobType string + +const ( + jobTypeCopy jobType = "copy" + jobTypeWait jobType = "wait" + jobTypeDelete jobType = "delete" +) + +type jobRequirement struct { + filePath string + minStatus transferStatus +} + +type transferStatus int + +const ( + transferStatusFailed transferStatus = iota - 1 + transferStatusNew + transferStatusCopied + transferStatusAppeared + transferStatusDeleted +) + +func (ts transferStatus) String() string { + switch ts { + case transferStatusNew: + return "new" + case transferStatusCopied: + return "copied" + case transferStatusAppeared: + return "appeared" + case transferStatusDeleted: + return "deleted" + case transferStatusFailed: + return "failed" + default: + return "unknown" + } +} + +func (h *Handler) transferConcurrently(workers int, files []FilesGroup, filesNum int) (finErr error) { + jobsQueue := make(chan transferJob, filesNum) + h.filesLeft += int32(filesNum) + for _, group := range files { + for _, file := range group { + h.saveRequirements(file) + h.fileStatuses.Store(file.path, transferStatusNew) + jobsQueue <- transferJob{ + key: jobKey{ + jobType: jobTypeCopy, + filePath: file.path, + }, + } + } + } + + errs := make(chan error, len(files)) + + workersCtx, cancelWorkers := context.WithCancel(context.Background()) + cancelOnSignal(cancelWorkers) + workersWG := new(sync.WaitGroup) + workersWG.Add(workers) + for i := 0; i < workers; i++ { + go h.transferFilesWorker(workersCtx, jobsQueue, errs, workersWG) + } + + errsWG := new(sync.WaitGroup) + errsWG.Add(1) + go func() { + defer errsWG.Done() + errsNum := 0 + for e := range errs { + if h.cfg.FailOnFirstErr { + cancelWorkers() + finErr = e + break + } + tracelog.ErrorLogger.PrintError(e) + errsNum++ + finErr = fmt.Errorf("finished with %d errors", errsNum) + } + }() + + workersWG.Wait() + close(errs) + errsWG.Wait() + + return finErr +} + +func (h *Handler) saveRequirements(file FileToMove) { + for _, requiredFile := range file.copyAfter { + job := jobKey{ + filePath: file.path, + jobType: jobTypeCopy, + } + req := jobRequirement{ + filePath: requiredFile, + minStatus: transferStatusAppeared, + } + h.jobRequirements[job] = append(h.jobRequirements[job], req) + } + + for _, requiredFile := range file.deleteAfter { + job := jobKey{ + filePath: file.path, + jobType: jobTypeDelete, + } + req := jobRequirement{ + filePath: requiredFile, + minStatus: transferStatusDeleted, + } + h.jobRequirements[job] = append(h.jobRequirements[job], req) + } +} + +func cancelOnSignal(cancel context.CancelFunc) { + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, os.Interrupt) + go func() { + <-sigs + cancel() + }() +} + +func (h *Handler) transferFilesWorker( + ctx context.Context, + jobsQueue chan transferJob, + errs chan error, + wg *sync.WaitGroup, +) { + defer wg.Done() + + for { + var job transferJob + + select { + case job = <-jobsQueue: + // Go on + default: + // No more files to process, exit + return + } + + select { + case <-ctx.Done(): + // Processing has been canceled, exit + return + default: + // Go on + } + + var newJob *transferJob + ok, err := h.checkRequirements(job) + if ok { + switch job.key.jobType { + case jobTypeCopy: + newJob, err = h.copyFile(job) + case jobTypeWait: + newJob, err = h.waitFile(job) + case jobTypeDelete: + err = h.deleteFile(job) + } + } else { + // Repeat the same job if its requirements haven't yet satisfied + newJob = &job + } + + if err != nil { + atomic.AddInt32(&h.filesLeft, -1) + errs <- fmt.Errorf("error with file %q: %s failed: %w", job.key.filePath, job.key.jobType, err) + continue + } + + if newJob != nil { + // Enqueue file again to process it later + jobsQueue <- *newJob + continue + } + + atomic.AddInt32(&h.filesLeft, -1) + tracelog.InfoLogger.Printf("File is transferred (%d left): %q", atomic.LoadInt32(&h.filesLeft), job.key.filePath) + } +} + +func (h *Handler) checkRequirements(job transferJob) (ok bool, err error) { + for _, required := range h.jobRequirements[job.key] { + s, ok := h.fileStatuses.Load(required.filePath) + if !ok { + return false, fmt.Errorf("job has a nonexistent requirement") + } + actualStatus := s.(transferStatus) + if actualStatus == transferStatusFailed { + return false, fmt.Errorf( + "%s operation requires other file %q to be %s, but it's failed", + job.key.jobType, + required.filePath, + required.minStatus, + ) + } + if actualStatus < required.minStatus { + return false, nil + } + } + return true, nil +} + +func (h *Handler) copyFile(job transferJob) (newJob *transferJob, err error) { + content, err := h.source.ReadObject(job.key.filePath) + if err != nil { + return nil, fmt.Errorf("read file from the source storage: %w", err) + } + defer utility.LoggedClose(content, "close object content read from the source storage") + + err = h.target.PutObject(job.key.filePath, content) + if err != nil { + return nil, fmt.Errorf("write file to the target storage: %w", err) + } + + h.fileStatuses.Store(job.key.filePath, transferStatusCopied) + job.key.jobType = jobTypeWait + newJob = &job + + return newJob, nil +} + +func (h *Handler) waitFile(job transferJob) (newJob *transferJob, err error) { + var appeared bool + + skipCheck := h.cfg.AppearanceChecks == 0 + if skipCheck { + appeared = true + } else { + appeared, err = h.checkForAppearance(job.prevCheck, job.key.filePath) + if err != nil { + return nil, err + } + } + + if appeared { + h.fileStatuses.Store(job.key.filePath, transferStatusAppeared) + job.key.jobType = jobTypeDelete + newJob = &job + return newJob, nil + } + + performedChecks := 1 + job.performedChecks + if performedChecks >= h.cfg.AppearanceChecks { + return nil, fmt.Errorf( + "couldn't wait for the file to appear in the target storage (%d checks performed)", + h.cfg.AppearanceChecks, + ) + } + + tracelog.WarningLogger.Printf( + "Written file hasn't appeared in the target storage (check %d of %d)", + performedChecks, + h.cfg.AppearanceChecks, + ) + + job.prevCheck = time.Now() + job.performedChecks = performedChecks + newJob = &job + + return newJob, nil +} + +func (h *Handler) checkForAppearance(prevCheck time.Time, filePath string) (appeared bool, err error) { + nextCheck := prevCheck.Add(h.cfg.AppearanceChecksInterval) + waitTime := time.Until(nextCheck) + if waitTime > 0 { + time.Sleep(waitTime) + } + + appeared, err = h.target.Exists(filePath) + if err != nil { + return false, fmt.Errorf("check if file exists in the target storage: %w", err) + } + return appeared, nil +} + +func (h *Handler) deleteFile(job transferJob) error { + err := h.source.DeleteObjects([]string{job.key.filePath}) + if err != nil { + return fmt.Errorf("delete file from the source storage: %w", err) + } + h.fileStatuses.Store(job.key.filePath, transferStatusDeleted) + return nil +} diff --git a/internal/storagetools/transfer/handler_test.go b/internal/storagetools/transfer/handler_test.go new file mode 100644 index 000000000..3a56a020a --- /dev/null +++ b/internal/storagetools/transfer/handler_test.go @@ -0,0 +1,613 @@ +package transfer + +import ( + "bytes" + "fmt" + "io" + "strconv" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/wal-g/wal-g/pkg/storages/memory" + "github.com/wal-g/wal-g/pkg/storages/memory/mock" + "github.com/wal-g/wal-g/pkg/storages/storage" +) + +func TestTransferHandler_Handle_Backup(t *testing.T) { + defaultHandler := func() *Handler { + lister := NewAllBackupsFileLister(false, 1000, 4) + return &Handler{ + source: memory.NewFolder("source/", memory.NewStorage()), + target: memory.NewFolder("target/", memory.NewStorage()), + fileLister: lister, + cfg: &HandlerConfig{ + FailOnFirstErr: false, + Concurrency: 7, + AppearanceChecks: 100, + AppearanceChecksInterval: time.Millisecond, + }, + fileStatuses: new(sync.Map), + jobRequirements: map[jobKey][]jobRequirement{}, + } + } + + genBackupFiles := func(num, filesNum int) []string { + backupPrefix := fmt.Sprintf("basebackups_005/base_00%d", num) + files := []string{ + backupPrefix + "_backup_stop_sentinel.json", + } + for i := 1; i <= filesNum-1; i++ { + files = append(files, fmt.Sprintf("%s/tar_partitions/part_%d.tar", backupPrefix, i)) + } + return files + } + + t.Run("move all backups", func(t *testing.T) { + h := defaultHandler() + + for i := 1; i <= 4; i++ { + for _, f := range genBackupFiles(i, i*100) { + _ = h.source.PutObject(f, bytes.NewBufferString("abc")) + } + } + + err := h.Handle() + require.NoError(t, err) + + for i := 1; i <= 4; i++ { + for _, f := range genBackupFiles(i, i*100) { + exists, err := h.source.Exists(f) + require.NoError(t, err) + require.False(t, exists) + + exists, err = h.target.Exists(f) + require.NoError(t, err) + require.True(t, exists) + } + } + }) + + t.Run("operate backup files in correct order", func(t *testing.T) { + h := defaultHandler() + sourceMock := mock.NewFolder(memory.NewFolder("source/", memory.NewStorage())) + h.source = sourceMock + targetMock := mock.NewFolder(memory.NewFolder("target/", memory.NewStorage())) + h.target = targetMock + + for _, f := range genBackupFiles(1, 100) { + _ = h.source.PutObject(f, bytes.NewBufferString("abc")) + } + + var ( + dataFilesCopied = int32(0) + sentinelDeleted = false + ) + + targetMock.PutObjectMock = func(name string, content io.Reader) error { + if strings.HasSuffix(name, "_backup_stop_sentinel.json") { + if atomic.LoadInt32(&dataFilesCopied) < 99 { + t.Fatalf("sentinel file must be copied to target storage only after all other files") + } + return targetMock.MemFolder.PutObject(name, content) + } + go func() { + time.Sleep(time.Millisecond) + atomic.AddInt32(&dataFilesCopied, 1) + _ = targetMock.MemFolder.PutObject(name, content) + }() + return nil + } + sourceMock.DeleteObjectsMock = func(objectRelativePaths []string) error { + if strings.HasSuffix(objectRelativePaths[0], "_backup_stop_sentinel.json") { + sentinelDeleted = true + } else if !sentinelDeleted { + t.Fatalf("sentinel file must be deleted from source storage before all other files") + } + return sourceMock.MemFolder.DeleteObjects(objectRelativePaths) + } + + err := h.Handle() + require.NoError(t, err) + + for _, f := range genBackupFiles(1, 100) { + exists, err := h.source.Exists(f) + require.NoError(t, err) + require.False(t, exists) + + exists, err = h.target.Exists(f) + require.NoError(t, err) + require.True(t, exists) + } + }) +} + +func TestTransferHandler_Handle(t *testing.T) { + defaultHandler := func() *Handler { + return &Handler{ + source: memory.NewFolder("source/", memory.NewStorage()), + target: memory.NewFolder("target/", memory.NewStorage()), + fileLister: NewRegularFileLister("/", false, 100500), + cfg: &HandlerConfig{ + FailOnFirstErr: false, + Concurrency: 5, + AppearanceChecks: 3, + AppearanceChecksInterval: 0, + }, + fileStatuses: new(sync.Map), + jobRequirements: map[jobKey][]jobRequirement{}, + } + } + + countFiles := func(folder storage.Folder, max int) int { + found := 0 + for i := 0; i < max; i++ { + exists, err := folder.Exists(strconv.Itoa(i)) + assert.NoError(t, err) + if exists { + found++ + } + } + return found + } + + t.Run("move all nonexistent files", func(t *testing.T) { + h := defaultHandler() + h.fileLister.(*RegularFileLister).MaxFiles = 80 + + for i := 0; i < 100; i++ { + _ = h.source.PutObject(strconv.Itoa(i), &bytes.Buffer{}) + } + + for i := 0; i < 10; i++ { + _ = h.target.PutObject(strconv.Itoa(i), &bytes.Buffer{}) + } + + err := h.Handle() + assert.NoError(t, err) + + assert.Equal(t, 90, countFiles(h.target, 100)) + assert.Equal(t, 20, countFiles(h.source, 100)) + }) + + t.Run("tolerate errors with some files", func(t *testing.T) { + targetMock := mock.NewFolder(memory.NewFolder("target/", memory.NewStorage())) + + putCalls := 0 + putCallsMux := new(sync.Mutex) + targetMock.PutObjectMock = func(name string, content io.Reader) error { + putCallsMux.Lock() + defer putCallsMux.Unlock() + putCalls++ + if putCalls%5 == 0 { + return fmt.Errorf("test") + } + return targetMock.MemFolder.PutObject(name, content) + } + + h := defaultHandler() + h.target = targetMock + + for i := 0; i < 100; i++ { + _ = h.source.PutObject(strconv.Itoa(i), &bytes.Buffer{}) + } + + err := h.Handle() + require.Error(t, err) + assert.Contains(t, err.Error(), "finished with 20 errors") + + assert.Equal(t, 80, countFiles(h.target, 100)) + assert.Equal(t, 20, countFiles(h.source, 100)) + }) + + t.Run("fail after first error if it is configured so", func(t *testing.T) { + sourceMock := mock.NewFolder(memory.NewFolder("source/", memory.NewStorage())) + + delCalls := 0 + dellCallsMux := new(sync.Mutex) + sourceMock.DeleteObjectsMock = func(paths []string) error { + dellCallsMux.Lock() + defer dellCallsMux.Unlock() + delCalls++ + if delCalls > 15 { + return fmt.Errorf("test") + } + return sourceMock.MemFolder.DeleteObjects(paths) + } + + h := defaultHandler() + h.source = sourceMock + h.cfg.FailOnFirstErr = true + + for i := 0; i < 100; i++ { + _ = h.source.PutObject(strconv.Itoa(i), &bytes.Buffer{}) + } + + err := h.Handle() + require.Error(t, err) + assert.Contains(t, err.Error(), "delete file") + + assert.Equal(t, 100, countFiles(h.target, 100)) + assert.Equal(t, 85, countFiles(h.source, 100)) + }) +} + +func TestTransferHandler_saveRequirements(t *testing.T) { + h := &Handler{jobRequirements: map[jobKey][]jobRequirement{}} + file := FileToMove{ + path: "1", + copyAfter: []string{"2", "3"}, + deleteAfter: []string{"4", "5"}, + } + + copyJobKey := jobKey{ + filePath: "1", + jobType: jobTypeCopy, + } + deleteJobKey := jobKey{ + filePath: "1", + jobType: jobTypeDelete, + } + + h.saveRequirements(file) + + assert.Equal(t, + []jobRequirement{ + { + filePath: "2", + minStatus: transferStatusAppeared, + }, + { + filePath: "3", + minStatus: transferStatusAppeared, + }, + }, + h.jobRequirements[copyJobKey], + ) + assert.Equal(t, + []jobRequirement{ + { + filePath: "4", + minStatus: transferStatusDeleted, + }, + { + filePath: "5", + minStatus: transferStatusDeleted, + }, + }, + h.jobRequirements[deleteJobKey], + ) +} + +func TestTransferHandler_checkRequirements(t *testing.T) { + h := &Handler{ + jobRequirements: map[jobKey][]jobRequirement{ + jobKey{filePath: "1", jobType: jobTypeDelete}: { + { + filePath: "2", + minStatus: transferStatusCopied, + }, + }, + jobKey{filePath: "2", jobType: jobTypeDelete}: { + { + filePath: "3", + minStatus: transferStatusAppeared, + }, + }, + jobKey{filePath: "3", jobType: jobTypeDelete}: { + { + filePath: "4", + minStatus: transferStatusAppeared, + }, + }, + }, + fileStatuses: new(sync.Map), + } + h.fileStatuses.Store("2", transferStatusAppeared) + h.fileStatuses.Store("3", transferStatusCopied) + h.fileStatuses.Store("4", transferStatusFailed) + + t.Run("true if requirements are satisfied", func(t *testing.T) { + ok, err := h.checkRequirements(transferJob{ + key: jobKey{ + filePath: "1", + jobType: jobTypeDelete, + }, + }) + require.NoError(t, err) + assert.True(t, ok) + }) + + t.Run("false is requirements are not satisfied", func(t *testing.T) { + ok, err := h.checkRequirements(transferJob{ + key: jobKey{ + filePath: "2", + jobType: jobTypeDelete, + }, + }) + require.NoError(t, err) + assert.False(t, ok) + }) + + t.Run("throw error when required file is failed", func(t *testing.T) { + _, err := h.checkRequirements(transferJob{ + key: jobKey{ + filePath: "3", + jobType: jobTypeDelete, + }, + }) + require.Error(t, err) + assert.Contains(t, err.Error(), `delete operation requires other file "4" to be appeared, but it's failed`) + }) +} + +func TestTransferHandler_copyFile(t *testing.T) { + defaultHandler := func() *Handler { + return &Handler{ + source: memory.NewFolder("source/", memory.NewStorage()), + target: memory.NewFolder("target/", memory.NewStorage()), + fileStatuses: new(sync.Map), + } + } + + t.Run("write new file", func(t *testing.T) { + h := defaultHandler() + + _ = h.source.PutObject("1", bytes.NewBufferString("source")) + + job := transferJob{ + key: jobKey{ + filePath: "1", + jobType: jobTypeCopy, + }, + } + + _, err := h.copyFile(job) + require.NoError(t, err) + + file, err := h.target.ReadObject("1") + assert.NoError(t, err) + content, _ := io.ReadAll(file) + assert.Equal(t, "source", string(content)) + }) + + t.Run("overwrite existing file", func(t *testing.T) { + h := defaultHandler() + + _ = h.source.PutObject("1", bytes.NewBufferString("source")) + _ = h.target.PutObject("1", bytes.NewBufferString("target")) + + job := transferJob{ + key: jobKey{ + filePath: "1", + jobType: jobTypeCopy, + }, + } + + _, err := h.copyFile(job) + require.NoError(t, err) + + file, err := h.target.ReadObject("1") + assert.NoError(t, err) + content, _ := io.ReadAll(file) + assert.Equal(t, "source", string(content)) + }) + + t.Run("provide new wait job and update status", func(t *testing.T) { + h := defaultHandler() + + _ = h.source.PutObject("1", bytes.NewBufferString("source")) + + job := transferJob{ + key: jobKey{ + filePath: "1", + jobType: jobTypeCopy, + }, + } + + newJob, err := h.copyFile(job) + require.NoError(t, err) + + wantJob := &transferJob{ + key: jobKey{ + filePath: "1", + jobType: jobTypeWait, + }, + } + assert.Equal(t, wantJob, newJob) + + status, ok := h.fileStatuses.Load("1") + require.True(t, ok) + assert.Equal(t, transferStatusCopied, status) + }) + + t.Run("handle read err", func(t *testing.T) { + h := defaultHandler() + + job := transferJob{ + key: jobKey{ + filePath: "1", + jobType: jobTypeCopy, + }, + } + + _, err := h.copyFile(job) + require.Error(t, err) + assert.Contains(t, err.Error(), "read file") + }) +} + +func TestTransferHandler_aitFile(t *testing.T) { + defaultHandler := func() *Handler { + return &Handler{ + source: memory.NewFolder("source/", memory.NewStorage()), + target: memory.NewFolder("target/", memory.NewStorage()), + fileStatuses: new(sync.Map), + cfg: &HandlerConfig{ + AppearanceChecks: 3, + AppearanceChecksInterval: 0, + }, + } + } + + t.Run("provide wait job again if file has not appeared", func(t *testing.T) { + h := defaultHandler() + + job := transferJob{ + key: jobKey{ + filePath: "1", + jobType: jobTypeWait, + }, + prevCheck: time.Time{}, + performedChecks: 0, + } + + newJob, err := h.waitFile(job) + assert.NoError(t, err) + assert.NotNil(t, newJob) + assert.Equal(t, "1", newJob.key.filePath) + assert.Equal(t, jobTypeWait, newJob.key.jobType) + assert.NotEqual(t, time.Time{}, newJob.prevCheck) + assert.Equal(t, uint(1), newJob.performedChecks) + _, ok := h.fileStatuses.Load("1") + assert.False(t, ok) + }) + + t.Run("provide delete job if file has appeared", func(t *testing.T) { + h := defaultHandler() + + _ = h.target.PutObject("1", &bytes.Buffer{}) + + job := transferJob{ + key: jobKey{ + filePath: "1", + jobType: jobTypeWait, + }, + prevCheck: time.Time{}, + performedChecks: 0, + } + + newJob, err := h.waitFile(job) + assert.NoError(t, err) + assert.NotNil(t, newJob) + assert.Equal(t, "1", newJob.key.filePath) + assert.Equal(t, jobTypeDelete, newJob.key.jobType) + status, ok := h.fileStatuses.Load("1") + assert.True(t, ok) + assert.Equal(t, transferStatusAppeared, status) + }) + + t.Run("provide delete job if checking is turned off", func(t *testing.T) { + h := defaultHandler() + h.cfg.AppearanceChecks = 0 + + job := transferJob{ + key: jobKey{ + filePath: "1", + jobType: jobTypeWait, + }, + prevCheck: time.Time{}, + performedChecks: 0, + } + + newJob, err := h.waitFile(job) + assert.NoError(t, err) + assert.NotNil(t, newJob) + assert.Equal(t, "1", newJob.key.filePath) + assert.Equal(t, jobTypeDelete, newJob.key.jobType) + }) + + t.Run("throw error when checks number exceeded", func(t *testing.T) { + h := defaultHandler() + + job := transferJob{ + key: jobKey{ + filePath: "1", + jobType: jobTypeWait, + }, + prevCheck: time.Time{}, + performedChecks: 0, + } + + for i := 0; i < 2; i++ { + newJob, err := h.waitFile(job) + assert.NoError(t, err) + require.NotNil(t, newJob) + assert.Equal(t, uint(i+1), newJob.performedChecks) + job = *newJob + } + _, err := h.waitFile(job) + require.Error(t, err) + assert.Contains(t, err.Error(), "couldn't wait for the file to appear") + }) +} + +func TestTransferHandler_deleteFile(t *testing.T) { + h := &Handler{ + source: memory.NewFolder("source/", memory.NewStorage()), + target: memory.NewFolder("target/", memory.NewStorage()), + fileStatuses: new(sync.Map), + } + + _ = h.source.PutObject("1", &bytes.Buffer{}) + + job := transferJob{ + key: jobKey{ + filePath: "1", + jobType: jobTypeDelete, + }, + } + + err := h.deleteFile(job) + require.NoError(t, err) + + exists, err := h.source.Exists("1") + require.NoError(t, err) + assert.False(t, exists) + + status, ok := h.fileStatuses.Load("1") + assert.True(t, ok) + assert.Equal(t, transferStatusDeleted, status) +} + +func TestTransferHandler_checkForAppearance(t *testing.T) { + t.Run("wait until next check time", func(t *testing.T) { + h := &Handler{ + target: memory.NewFolder("target/", memory.NewStorage()), + cfg: &HandlerConfig{ + AppearanceChecksInterval: 100 * time.Millisecond, + }, + } + + _ = h.target.PutObject("1", &bytes.Buffer{}) + + thisCheckTime := time.Now() + prevCheckTime := thisCheckTime.Add(-50 * time.Millisecond) + + appeared, err := h.checkForAppearance(prevCheckTime, "1") + assert.GreaterOrEqual(t, time.Now(), thisCheckTime.Add(50*time.Millisecond)) + assert.NoError(t, err) + assert.True(t, appeared) + }) + + t.Run("dont wait if time has come", func(t *testing.T) { + h := &Handler{ + target: memory.NewFolder("target/", memory.NewStorage()), + cfg: &HandlerConfig{ + AppearanceChecksInterval: time.Hour, + }, + } + + _ = h.target.PutObject("1", &bytes.Buffer{}) + + prevCheckTime := time.Now().Add(-time.Hour) + + appeared, err := h.checkForAppearance(prevCheckTime, "1") + assert.NoError(t, err) + assert.True(t, appeared) + }) +} diff --git a/internal/storagetools/transfer_handler.go b/internal/storagetools/transfer_handler.go deleted file mode 100644 index 080df3073..000000000 --- a/internal/storagetools/transfer_handler.go +++ /dev/null @@ -1,308 +0,0 @@ -package storagetools - -import ( - "context" - "fmt" - "os" - "os/signal" - "sync" - "time" - - "github.com/wal-g/tracelog" - "github.com/wal-g/wal-g/internal/multistorage" - "github.com/wal-g/wal-g/pkg/storages/storage" - "github.com/wal-g/wal-g/utility" -) - -type TransferHandler struct { - source storage.Folder - target storage.Folder - cfg *TransferHandlerConfig -} - -type TransferHandlerConfig struct { - Prefix string - Overwrite bool - FailOnFirstErr bool - Concurrency int - MaxFiles int - AppearanceChecks uint - AppearanceChecksInterval time.Duration -} - -func NewTransferHandler(sourceStorage, targetStorage string, cfg *TransferHandlerConfig) (*TransferHandler, error) { - sourceFolder, err := multistorage.ConfigureStorageFolder(sourceStorage) - if err != nil { - return nil, fmt.Errorf("can't configure source storage folder: %w", err) - } - targetFolder, err := multistorage.ConfigureStorageFolder(targetStorage) - if err != nil { - return nil, fmt.Errorf("can't configure target storage folder: %w", err) - } - - return &TransferHandler{ - source: sourceFolder, - target: targetFolder, - cfg: cfg, - }, nil -} - -func (h *TransferHandler) Handle() error { - files, err := h.listFilesToMove() - if err != nil { - return err - } - - workersNum := utility.Min(h.cfg.Concurrency, len(files)) - return h.transferConcurrently(workersNum, files) -} - -func (h *TransferHandler) listFilesToMove() ([]storage.Object, error) { - targetFiles, err := storage.ListFolderRecursivelyWithPrefix(h.target, h.cfg.Prefix) - if err != nil { - return nil, fmt.Errorf("can't list files in the target storage: %w", err) - } - sourceFiles, err := storage.ListFolderRecursivelyWithPrefix(h.source, h.cfg.Prefix) - if err != nil { - return nil, fmt.Errorf("can't list files in the source storage: %w", err) - } - tracelog.InfoLogger.Printf("Total files in the source storage: %d", len(sourceFiles)) - - missingFiles := make(map[string]storage.Object, len(sourceFiles)) - for _, sourceFile := range sourceFiles { - missingFiles[sourceFile.GetName()] = sourceFile - } - for _, targetFile := range targetFiles { - sourceFile, presentInBothStorages := missingFiles[targetFile.GetName()] - if !presentInBothStorages { - continue - } - if h.cfg.Overwrite { - logSizesDifference(sourceFile, targetFile) - } else { - delete(missingFiles, targetFile.GetName()) - } - } - tracelog.InfoLogger.Printf("Files missing in the target storage: %d", len(missingFiles)) - - count := 0 - limitedFiles := make([]storage.Object, 0, utility.Min(h.cfg.MaxFiles, len(missingFiles))) - for _, file := range missingFiles { - if count >= h.cfg.MaxFiles { - break - } - limitedFiles = append(limitedFiles, file) - count++ - } - tracelog.InfoLogger.Printf("Files will be transferred: %d", len(limitedFiles)) - - return limitedFiles, nil -} - -func logSizesDifference(sourceFile, targetFile storage.Object) { - if sourceFile.GetSize() != targetFile.GetSize() { - tracelog.WarningLogger.Printf( - "File present in both storages and its size is different: %q (source %d bytes VS target %d bytes)", - targetFile.GetName(), - sourceFile.GetSize(), - targetFile.GetSize(), - ) - } -} - -type transferJob struct { - jobType transferJobType - filePath string - prevCheck time.Time - performedChecks uint -} - -type transferJobType int - -const ( - transferJobTypeCopy transferJobType = iota - transferJobTypeDelete -) - -func (h *TransferHandler) transferConcurrently(workers int, files []storage.Object) (finErr error) { - jobsQueue := make(chan transferJob, len(files)) - for _, f := range files { - jobsQueue <- transferJob{ - jobType: transferJobTypeCopy, - filePath: f.GetName(), - } - } - - errs := make(chan error, len(files)) - - workersCtx, cancelWorkers := context.WithCancel(context.Background()) - cancelOnSignal(cancelWorkers) - workersWG := new(sync.WaitGroup) - workersWG.Add(workers) - for i := 0; i < workers; i++ { - go h.transferFilesWorker(workersCtx, jobsQueue, errs, workersWG) - } - - errsWG := new(sync.WaitGroup) - errsWG.Add(1) - go func() { - defer errsWG.Done() - errsNum := 0 - for e := range errs { - if h.cfg.FailOnFirstErr { - cancelWorkers() - finErr = e - break - } - tracelog.ErrorLogger.PrintError(e) - errsNum++ - finErr = fmt.Errorf("finished with %d errors", errsNum) - } - }() - - workersWG.Wait() - close(errs) - errsWG.Wait() - - return finErr -} - -func cancelOnSignal(cancel context.CancelFunc) { - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, os.Interrupt) - go func() { - <-sigs - cancel() - }() -} - -func (h *TransferHandler) transferFilesWorker( - ctx context.Context, - jobsQueue chan transferJob, - errs chan error, - wg *sync.WaitGroup, -) { - defer wg.Done() - - for { - var job transferJob - - select { - case job = <-jobsQueue: - // Go on - default: - // No more files to process, exit - return - } - - select { - case <-ctx.Done(): - // Processing has been canceled, exit - return - default: - // Go on - } - - var newJob *transferJob - var err error - - switch job.jobType { - case transferJobTypeCopy: - newJob, err = h.copyFile(job) - case transferJobTypeDelete: - newJob, err = h.deleteFile(job) - } - - if err != nil { - errs <- fmt.Errorf("error with file %q: %w", job.filePath, err) - continue - } - - if newJob != nil { - // Enqueue file again to process it later - jobsQueue <- *newJob - continue - } - - tracelog.InfoLogger.Printf("File is transferred (%d left): %q", len(jobsQueue), job.filePath) - } -} - -func (h *TransferHandler) copyFile(job transferJob) (newJob *transferJob, err error) { - content, err := h.source.ReadObject(job.filePath) - if err != nil { - return nil, fmt.Errorf("can't read file from the source storage: %w", err) - } - defer utility.LoggedClose(content, "can't close object content read from the source storage") - - err = h.target.PutObject(job.filePath, content) - if err != nil { - return nil, fmt.Errorf("can't write file to the target storage: %w", err) - } - - newJob = &transferJob{ - jobType: transferJobTypeDelete, - filePath: job.filePath, - } - - return newJob, nil -} - -func (h *TransferHandler) deleteFile(job transferJob) (newJob *transferJob, err error) { - var appeared bool - - skipCheck := h.cfg.AppearanceChecks == 0 - if skipCheck { - appeared = true - } else { - appeared, err = h.checkForAppearance(job.prevCheck, job.filePath) - if err != nil { - return nil, err - } - } - - if appeared { - err = h.source.DeleteObjects([]string{job.filePath}) - if err != nil { - return nil, fmt.Errorf("can't delete file from the source storage: %w", err) - } - return nil, nil - } - - performedChecks := 1 + job.performedChecks - if performedChecks >= h.cfg.AppearanceChecks { - return nil, fmt.Errorf( - "couldn't wait for the file to appear in the target storage (%d checks performed)", - h.cfg.AppearanceChecks, - ) - } - - tracelog.WarningLogger.Printf( - "Written file hasn't appeared in the target storage (check %d of %d)", - performedChecks, - h.cfg.AppearanceChecks, - ) - - newJob = &transferJob{ - jobType: transferJobTypeDelete, - filePath: job.filePath, - prevCheck: time.Now(), - performedChecks: performedChecks, - } - - return newJob, nil -} - -func (h *TransferHandler) checkForAppearance(prevCheck time.Time, filePath string) (appeared bool, err error) { - nextCheck := prevCheck.Add(h.cfg.AppearanceChecksInterval) - waitTime := time.Until(nextCheck) - if waitTime > 0 { - time.Sleep(waitTime) - } - - appeared, err = h.target.Exists(filePath) - if err != nil { - return false, fmt.Errorf("can't check if file exists in the target storage: %w", err) - } - return appeared, nil -} diff --git a/internal/storagetools/transfer_handler_test.go b/internal/storagetools/transfer_handler_test.go deleted file mode 100644 index d0e06edff..000000000 --- a/internal/storagetools/transfer_handler_test.go +++ /dev/null @@ -1,458 +0,0 @@ -package storagetools - -import ( - "bytes" - "fmt" - "io" - "strconv" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/wal-g/wal-g/pkg/storages/memory" - "github.com/wal-g/wal-g/pkg/storages/memory/mock" - "github.com/wal-g/wal-g/pkg/storages/storage" -) - -func TestTransferHandler_Handle(t *testing.T) { - countFiles := func(folder storage.Folder, max int) int { - found := 0 - for i := 0; i < max; i++ { - exists, err := folder.Exists(strconv.Itoa(i)) - assert.NoError(t, err) - if exists { - found++ - } - } - return found - } - - t.Run("move all nonexistent files", func(t *testing.T) { - h := TransferHandler{ - source: memory.NewFolder("source/", memory.NewStorage()), - target: memory.NewFolder("target/", memory.NewStorage()), - cfg: &TransferHandlerConfig{ - Prefix: "/", - Overwrite: false, - FailOnFirstErr: false, - Concurrency: 5, - MaxFiles: 80, - AppearanceChecks: 3, - AppearanceChecksInterval: 0, - }, - } - - for i := 0; i < 100; i++ { - _ = h.source.PutObject(strconv.Itoa(i), &bytes.Buffer{}) - } - - for i := 0; i < 10; i++ { - _ = h.target.PutObject(strconv.Itoa(i), &bytes.Buffer{}) - } - - err := h.Handle() - assert.NoError(t, err) - - assert.Equal(t, 90, countFiles(h.target, 100)) - assert.Equal(t, 20, countFiles(h.source, 100)) - }) - - t.Run("tolerate errors with some files", func(t *testing.T) { - target := mock.NewFolder(memory.NewFolder("target/", memory.NewStorage())) - - putCalls := 0 - putCallsMux := new(sync.Mutex) - target.PutObjectMock = func(name string, content io.Reader) error { - putCallsMux.Lock() - defer putCallsMux.Unlock() - putCalls++ - if putCalls%5 == 0 { - return fmt.Errorf("test") - } - return target.MemFolder.PutObject(name, content) - } - - h := TransferHandler{ - source: memory.NewFolder("source/", memory.NewStorage()), - target: target, - cfg: &TransferHandlerConfig{ - Prefix: "/", - Overwrite: false, - FailOnFirstErr: false, - Concurrency: 5, - MaxFiles: 100500, - AppearanceChecks: 3, - AppearanceChecksInterval: 0, - }, - } - - for i := 0; i < 100; i++ { - _ = h.source.PutObject(strconv.Itoa(i), &bytes.Buffer{}) - } - - err := h.Handle() - require.Error(t, err) - assert.Contains(t, err.Error(), "finished with 20 errors") - - assert.Equal(t, 80, countFiles(h.target, 100)) - assert.Equal(t, 20, countFiles(h.source, 100)) - }) - - t.Run("fail after first error if its configured so", func(t *testing.T) { - source := mock.NewFolder(memory.NewFolder("source/", memory.NewStorage())) - - delCalls := 0 - dellCallsMux := new(sync.Mutex) - source.DeleteObjectsMock = func(paths []string) error { - dellCallsMux.Lock() - defer dellCallsMux.Unlock() - delCalls++ - if delCalls > 15 { - return fmt.Errorf("test") - } - return source.MemFolder.DeleteObjects(paths) - } - - h := TransferHandler{ - source: source, - target: memory.NewFolder("target/", memory.NewStorage()), - cfg: &TransferHandlerConfig{ - Prefix: "/", - Overwrite: false, - FailOnFirstErr: true, - Concurrency: 5, - MaxFiles: 100500, - AppearanceChecks: 3, - AppearanceChecksInterval: 0, - }, - } - - for i := 0; i < 100; i++ { - _ = h.source.PutObject(strconv.Itoa(i), &bytes.Buffer{}) - } - - err := h.Handle() - require.Error(t, err) - assert.Contains(t, err.Error(), "can't delete file") - - assert.Equal(t, 100, countFiles(h.target, 100)) - assert.Equal(t, 85, countFiles(h.source, 100)) - }) -} - -func TestTransferHandler_listFilesToMove(t *testing.T) { - defaultHandler := func() *TransferHandler { - return &TransferHandler{ - source: memory.NewFolder("source/", memory.NewStorage()), - target: memory.NewFolder("target/", memory.NewStorage()), - cfg: &TransferHandlerConfig{ - Prefix: "/", - Overwrite: false, - MaxFiles: 100, - }, - } - } - - t.Run("list files from parent dir only", func(t *testing.T) { - h := defaultHandler() - h.cfg.Prefix = "1/" - - _ = h.source.PutObject("1/a", &bytes.Buffer{}) - _ = h.source.PutObject("2/a", &bytes.Buffer{}) - - files, err := h.listFilesToMove() - assert.NoError(t, err) - - require.Len(t, files, 1) - assert.Equal(t, "1/a", files[0].GetName()) - }) - - t.Run("exclude already existing files", func(t *testing.T) { - h := defaultHandler() - - _ = h.source.PutObject("1", &bytes.Buffer{}) - _ = h.source.PutObject("2", &bytes.Buffer{}) - - _ = h.target.PutObject("1", &bytes.Buffer{}) - - files, err := h.listFilesToMove() - assert.NoError(t, err) - - require.Len(t, files, 1) - assert.Equal(t, "2", files[0].GetName()) - }) - - t.Run("include existing files when overwrite allowed", func(t *testing.T) { - h := defaultHandler() - h.cfg.Overwrite = true - - _ = h.source.PutObject("1", &bytes.Buffer{}) - _ = h.source.PutObject("2", &bytes.Buffer{}) - - _ = h.target.PutObject("1", &bytes.Buffer{}) - - files, err := h.listFilesToMove() - assert.NoError(t, err) - - require.Len(t, files, 2) - }) - - t.Run("dont include nonexistent files even when overwrite allowed", func(t *testing.T) { - h := defaultHandler() - h.cfg.Overwrite = true - - _ = h.source.PutObject("2", &bytes.Buffer{}) - - _ = h.target.PutObject("1", &bytes.Buffer{}) - - files, err := h.listFilesToMove() - assert.NoError(t, err) - - require.Len(t, files, 1) - assert.Equal(t, "2", files[0].GetName()) - }) - - t.Run("limit number of files", func(t *testing.T) { - h := defaultHandler() - h.cfg.MaxFiles = 1 - - _ = h.source.PutObject("1", &bytes.Buffer{}) - _ = h.source.PutObject("2", &bytes.Buffer{}) - - files, err := h.listFilesToMove() - assert.NoError(t, err) - - require.Len(t, files, 1) - }) -} - -func TestTransferHandler_copyFile(t *testing.T) { - defaultHandler := func() *TransferHandler { - return &TransferHandler{ - source: memory.NewFolder("source/", memory.NewStorage()), - target: memory.NewFolder("target/", memory.NewStorage()), - } - } - - t.Run("write new file", func(t *testing.T) { - h := defaultHandler() - - _ = h.source.PutObject("1", bytes.NewBufferString("source")) - - job := transferJob{ - jobType: transferJobTypeCopy, - filePath: "1", - } - - _, err := h.copyFile(job) - require.NoError(t, err) - - file, err := h.target.ReadObject("1") - assert.NoError(t, err) - content, _ := io.ReadAll(file) - assert.Equal(t, "source", string(content)) - }) - - t.Run("overwrite existing file", func(t *testing.T) { - h := defaultHandler() - - _ = h.source.PutObject("1", bytes.NewBufferString("source")) - _ = h.target.PutObject("1", bytes.NewBufferString("target")) - - job := transferJob{ - jobType: transferJobTypeCopy, - filePath: "1", - } - - _, err := h.copyFile(job) - require.NoError(t, err) - - file, err := h.target.ReadObject("1") - assert.NoError(t, err) - content, _ := io.ReadAll(file) - assert.Equal(t, "source", string(content)) - }) - - t.Run("provide new delete job", func(t *testing.T) { - h := defaultHandler() - - _ = h.source.PutObject("1", bytes.NewBufferString("source")) - - job := transferJob{ - jobType: transferJobTypeCopy, - filePath: "1", - } - - newJob, err := h.copyFile(job) - require.NoError(t, err) - - wantJob := &transferJob{ - jobType: transferJobTypeDelete, - filePath: "1", - } - assert.Equal(t, wantJob, newJob) - }) - - t.Run("handle read err", func(t *testing.T) { - h := defaultHandler() - - job := transferJob{ - jobType: transferJobTypeCopy, - filePath: "1", - } - - _, err := h.copyFile(job) - require.Error(t, err) - assert.Contains(t, err.Error(), "can't read file") - }) -} - -func TestTransferHandler_deleteFile(t *testing.T) { - defaultHandler := func() *TransferHandler { - return &TransferHandler{ - source: memory.NewFolder("source/", memory.NewStorage()), - target: memory.NewFolder("target/", memory.NewStorage()), - cfg: &TransferHandlerConfig{ - Prefix: "/", - Overwrite: false, - AppearanceChecks: 3, - AppearanceChecksInterval: 0, - }, - } - } - - t.Run("check for appearance before deleting", func(t *testing.T) { - h := defaultHandler() - - _ = h.source.PutObject("1", &bytes.Buffer{}) - - job := transferJob{ - jobType: transferJobTypeDelete, - filePath: "1", - prevCheck: time.Time{}, - performedChecks: 0, - } - - newJob, err := h.deleteFile(job) - assert.NoError(t, err) - assert.NotNil(t, newJob) - assert.Equal(t, "1", newJob.filePath) - assert.Equal(t, transferJobTypeDelete, newJob.jobType) - assert.NotEqual(t, time.Time{}, newJob.prevCheck) - assert.Equal(t, uint(1), newJob.performedChecks) - - _, err = h.source.ReadObject("1") - assert.NoError(t, err) - }) - - t.Run("delete file if it has appeared", func(t *testing.T) { - h := defaultHandler() - - _ = h.source.PutObject("1", &bytes.Buffer{}) - _ = h.target.PutObject("1", &bytes.Buffer{}) - - job := transferJob{ - jobType: transferJobTypeDelete, - filePath: "1", - prevCheck: time.Time{}, - performedChecks: 0, - } - - newJob, err := h.deleteFile(job) - assert.NoError(t, err) - assert.Nil(t, newJob) - - exists, err := h.source.Exists("1") - assert.NoError(t, err) - assert.False(t, exists) - }) - - t.Run("delete file instantly if checking is turned off", func(t *testing.T) { - h := defaultHandler() - h.cfg.AppearanceChecks = 0 - - _ = h.source.PutObject("1", &bytes.Buffer{}) - - job := transferJob{ - jobType: transferJobTypeDelete, - filePath: "1", - prevCheck: time.Time{}, - performedChecks: 0, - } - - newJob, err := h.deleteFile(job) - assert.NoError(t, err) - assert.Nil(t, newJob) - - exists, err := h.source.Exists("1") - assert.NoError(t, err) - assert.False(t, exists) - }) - - t.Run("throw error when checks number exceeded", func(t *testing.T) { - h := defaultHandler() - - _ = h.source.PutObject("1", &bytes.Buffer{}) - - job := transferJob{ - jobType: transferJobTypeDelete, - filePath: "1", - prevCheck: time.Time{}, - performedChecks: 0, - } - - for i := 0; i < 2; i++ { - newJob, err := h.deleteFile(job) - assert.NoError(t, err) - require.NotNil(t, newJob) - assert.Equal(t, uint(i+1), newJob.performedChecks) - job = *newJob - } - _, err := h.deleteFile(job) - require.Error(t, err) - assert.Contains(t, err.Error(), "couldn't wait for the file to appear") - - _, err = h.source.ReadObject("1") - assert.NoError(t, err) - }) -} - -func TestTransferHandler_checkForAppearance(t *testing.T) { - t.Run("wait until next check time", func(t *testing.T) { - h := &TransferHandler{ - target: memory.NewFolder("target/", memory.NewStorage()), - cfg: &TransferHandlerConfig{ - AppearanceChecksInterval: 100 * time.Millisecond, - }, - } - - _ = h.target.PutObject("1", &bytes.Buffer{}) - - thisCheckTime := time.Now() - prevCheckTime := thisCheckTime.Add(-50 * time.Millisecond) - - appeared, err := h.checkForAppearance(prevCheckTime, "1") - assert.GreaterOrEqual(t, time.Now(), thisCheckTime.Add(50*time.Millisecond)) - assert.NoError(t, err) - assert.True(t, appeared) - }) - - t.Run("dont wait if time has come", func(t *testing.T) { - h := &TransferHandler{ - target: memory.NewFolder("target/", memory.NewStorage()), - cfg: &TransferHandlerConfig{ - AppearanceChecksInterval: time.Hour, - }, - } - - _ = h.target.PutObject("1", &bytes.Buffer{}) - - prevCheckTime := time.Now().Add(-time.Hour) - - appeared, err := h.checkForAppearance(prevCheckTime, "1") - assert.NoError(t, err) - assert.True(t, appeared) - }) -}