From 35987d113dee8aa74c6e7a5aa7730368ae775554 Mon Sep 17 00:00:00 2001 From: Daniil Zakhlystov <47750602+usernamedt@users.noreply.github.com> Date: Thu, 11 May 2023 21:38:36 +0200 Subject: [PATCH] [Postgres] Add failover storages for wal-push and wal-fetch (#1466) --- .github/workflows/dockertests.yml | 1 + cmd/common/st/cat_object.go | 9 +- cmd/common/st/check.go | 13 +- cmd/common/st/delete_object.go | 9 +- cmd/common/st/folder_list.go | 16 +- cmd/common/st/get_object.go | 13 +- cmd/common/st/put_object.go | 7 +- cmd/common/st/st.go | 7 + cmd/pg/wal_fetch.go | 10 +- cmd/pg/wal_prefetch.go | 10 +- cmd/pg/wal_push.go | 21 +- docker-compose.yml | 12 + ...kerfile_full_backup_failover_storages_test | 3 + ..._backup_failover_storages_test_config.json | 4 + ...ailover_storages_test_config_failover.json | 17 ++ .../full_backup_failover_storages_test.sh | 87 +++++++ .../remote_backup_and_restore_test.sh | 3 + docs/PostgreSQL.md | 28 ++ internal/config.go | 244 +++++++++++------- internal/configure.go | 6 +- .../databases/mongo/archive/loader_test.go | 2 +- .../sqlserver/backup_export_handler.go | 2 +- .../sqlserver/backup_import_handler.go | 2 +- .../sqlserver/backup_push_handler.go | 2 +- internal/multistorage/alive_checker.go | 74 ++++++ internal/multistorage/alive_storage_cache.go | 123 +++++++++ internal/multistorage/executer.go | 72 ++++++ internal/multistorage/failover_folder.go | 41 +++ internal/multistorage/folder_reader.go | 132 ++++++++++ internal/multistorage/uploader.go | 62 +++++ internal/storagetools/cat_object_handler.go | 9 +- internal/storagetools/check.go | 14 +- .../storagetools/delete_object_handler.go | 17 +- internal/storagetools/folder_list_handler.go | 13 +- internal/storagetools/get_object_handler.go | 17 +- internal/storagetools/put_object_handler.go | 47 +++- internal/stream_push_helper_test.go | 2 +- internal/uploader.go | 4 +- testtools/util.go | 8 +- 39 files changed, 979 insertions(+), 184 deletions(-) create mode 100644 docker/pg_tests/Dockerfile_full_backup_failover_storages_test create mode 100755 docker/pg_tests/scripts/configs/full_backup_failover_storages_test_config.json create mode 100755 docker/pg_tests/scripts/configs/full_backup_failover_storages_test_config_failover.json create mode 100755 docker/pg_tests/scripts/tests/full_backup_failover_storages_test.sh create mode 100644 internal/multistorage/alive_checker.go create mode 100644 internal/multistorage/alive_storage_cache.go create mode 100644 internal/multistorage/executer.go create mode 100644 internal/multistorage/failover_folder.go create mode 100644 internal/multistorage/folder_reader.go create mode 100644 internal/multistorage/uploader.go diff --git a/.github/workflows/dockertests.yml b/.github/workflows/dockertests.yml index 5d7e32039..9aec7c068 100644 --- a/.github/workflows/dockertests.yml +++ b/.github/workflows/dockertests.yml @@ -86,6 +86,7 @@ jobs: 'make TEST="pg_full_backup_copy_composer_test" pg_integration_test', 'make TEST="pg_full_backup_rating_composer_test" pg_integration_test', 'make TEST="pg_full_backup_database_composer_test" pg_integration_test', + 'make TEST="pg_full_backup_failover_storage_test" pg_integration_test', 'make TEST="pg_delete_before_name_find_full_test" pg_integration_test', 'make TEST="pg_delete_retain_full_test" pg_integration_test', 'make TEST="pg_delete_before_time_find_full_test" pg_integration_test', diff --git a/cmd/common/st/cat_object.go b/cmd/common/st/cat_object.go index 196c1d010..fec9be92e 100644 --- a/cmd/common/st/cat_object.go +++ b/cmd/common/st/cat_object.go @@ -3,8 +3,9 @@ package st import ( "github.com/spf13/cobra" "github.com/wal-g/tracelog" - "github.com/wal-g/wal-g/internal" + "github.com/wal-g/wal-g/internal/multistorage" "github.com/wal-g/wal-g/internal/storagetools" + "github.com/wal-g/wal-g/pkg/storages/storage" ) const ( @@ -22,10 +23,10 @@ var catObjectCmd = &cobra.Command{ Run: func(cmd *cobra.Command, args []string) { objectPath := args[0] - folder, err := internal.ConfigureFolder() + err := multistorage.ExecuteOnStorage(targetStorage, func(folder storage.Folder) error { + return storagetools.HandleCatObject(objectPath, folder, decrypt, decompress) + }) tracelog.ErrorLogger.FatalOnError(err) - - storagetools.HandleCatObject(objectPath, folder, decrypt, decompress) }, } diff --git a/cmd/common/st/check.go b/cmd/common/st/check.go index bf0503360..d7e98443b 100644 --- a/cmd/common/st/check.go +++ b/cmd/common/st/check.go @@ -3,8 +3,9 @@ package st import ( "github.com/spf13/cobra" "github.com/wal-g/tracelog" - "github.com/wal-g/wal-g/internal" + "github.com/wal-g/wal-g/internal/multistorage" "github.com/wal-g/wal-g/internal/storagetools" + "github.com/wal-g/wal-g/pkg/storages/storage" ) var checkCmd = &cobra.Command{ @@ -17,9 +18,10 @@ var checkReadCmd = &cobra.Command{ Short: "check read access to the storage", Args: cobra.MinimumNArgs(0), Run: func(cmd *cobra.Command, args []string) { - folder, err := internal.ConfigureFolder() + err := multistorage.ExecuteOnStorage(targetStorage, func(folder storage.Folder) error { + return storagetools.HandleCheckRead(folder, args) + }) tracelog.ErrorLogger.FatalOnError(err) - storagetools.HandleCheckRead(folder, args) }, } @@ -28,9 +30,10 @@ var checkWriteCmd = &cobra.Command{ Short: "check write access to the storage", Args: cobra.NoArgs, Run: func(cmd *cobra.Command, args []string) { - folder, err := internal.ConfigureFolder() + err := multistorage.ExecuteOnStorage(targetStorage, func(folder storage.Folder) error { + return storagetools.HandleCheckWrite(folder) + }) tracelog.ErrorLogger.FatalOnError(err) - storagetools.HandleCheckWrite(folder) }, } diff --git a/cmd/common/st/delete_object.go b/cmd/common/st/delete_object.go index 107ba85e2..3e3440220 100644 --- a/cmd/common/st/delete_object.go +++ b/cmd/common/st/delete_object.go @@ -3,8 +3,9 @@ package st import ( "github.com/spf13/cobra" "github.com/wal-g/tracelog" - "github.com/wal-g/wal-g/internal" + "github.com/wal-g/wal-g/internal/multistorage" "github.com/wal-g/wal-g/internal/storagetools" + "github.com/wal-g/wal-g/pkg/storages/storage" ) const deleteObjectShortDescription = "Delete the specified storage object" @@ -15,10 +16,10 @@ var deleteObjectCmd = &cobra.Command{ Short: deleteObjectShortDescription, Args: cobra.ExactArgs(1), Run: func(cmd *cobra.Command, args []string) { - folder, err := internal.ConfigureFolder() + err := multistorage.ExecuteOnStorage(targetStorage, func(folder storage.Folder) error { + return storagetools.HandleDeleteObject(args[0], folder) + }) tracelog.ErrorLogger.FatalOnError(err) - - storagetools.HandleDeleteObject(args[0], folder) }, } diff --git a/cmd/common/st/folder_list.go b/cmd/common/st/folder_list.go index e149202a8..57131557e 100644 --- a/cmd/common/st/folder_list.go +++ b/cmd/common/st/folder_list.go @@ -3,8 +3,9 @@ package st import ( "github.com/spf13/cobra" "github.com/wal-g/tracelog" - "github.com/wal-g/wal-g/internal" + "github.com/wal-g/wal-g/internal/multistorage" "github.com/wal-g/wal-g/internal/storagetools" + "github.com/wal-g/wal-g/pkg/storages/storage" ) const folderListShortDescription = "Prints objects in the provided storage folder" @@ -17,14 +18,13 @@ var folderListCmd = &cobra.Command{ Short: folderListShortDescription, Args: cobra.RangeArgs(0, 1), Run: func(cmd *cobra.Command, args []string) { - folder, err := internal.ConfigureFolder() + err := multistorage.ExecuteOnStorage(targetStorage, func(folder storage.Folder) error { + if len(args) > 0 { + folder = folder.GetSubFolder(args[0]) + } + return storagetools.HandleFolderList(folder, recursive) + }) tracelog.ErrorLogger.FatalOnError(err) - - if len(args) > 0 { - folder = folder.GetSubFolder(args[0]) - } - - storagetools.HandleFolderList(folder, recursive) }, } diff --git a/cmd/common/st/get_object.go b/cmd/common/st/get_object.go index 65d2eb7e8..a180744c0 100644 --- a/cmd/common/st/get_object.go +++ b/cmd/common/st/get_object.go @@ -3,8 +3,9 @@ package st import ( "github.com/spf13/cobra" "github.com/wal-g/tracelog" - "github.com/wal-g/wal-g/internal" + "github.com/wal-g/wal-g/internal/multistorage" "github.com/wal-g/wal-g/internal/storagetools" + "github.com/wal-g/wal-g/pkg/storages/storage" ) const ( @@ -23,10 +24,14 @@ var getObjectCmd = &cobra.Command{ objectPath := args[0] dstPath := args[1] - folder, err := internal.ConfigureFolder() - tracelog.ErrorLogger.FatalOnError(err) + if targetStorage == "all" { + tracelog.ErrorLogger.Fatalf("'all' target is not supported for st get command") + } - storagetools.HandleGetObject(objectPath, dstPath, folder, !noDecrypt, !noDecompress) + err := multistorage.ExecuteOnStorage(targetStorage, func(folder storage.Folder) error { + return storagetools.HandleGetObject(objectPath, dstPath, folder, !noDecrypt, !noDecompress) + }) + tracelog.ErrorLogger.FatalOnError(err) }, } diff --git a/cmd/common/st/put_object.go b/cmd/common/st/put_object.go index 22e406a13..ad348b82e 100644 --- a/cmd/common/st/put_object.go +++ b/cmd/common/st/put_object.go @@ -4,7 +4,9 @@ import ( "github.com/spf13/cobra" "github.com/wal-g/tracelog" "github.com/wal-g/wal-g/internal" + "github.com/wal-g/wal-g/internal/multistorage" "github.com/wal-g/wal-g/internal/storagetools" + "github.com/wal-g/wal-g/pkg/storages/storage" ) const ( @@ -28,7 +30,10 @@ var putObjectCmd = &cobra.Command{ localPath := args[0] dstPath := args[1] - storagetools.HandlePutObject(localPath, dstPath, uploader, overwrite, !noEncrypt, !noCompress) + err = multistorage.ExecuteOnStorage(targetStorage, func(folder storage.Folder) error { + return storagetools.HandlePutObject(localPath, dstPath, uploader, overwrite, !noEncrypt, !noCompress) + }) + tracelog.ErrorLogger.FatalOnError(err) }, } diff --git a/cmd/common/st/st.go b/cmd/common/st/st.go index 734590530..9706e2a2a 100644 --- a/cmd/common/st/st.go +++ b/cmd/common/st/st.go @@ -2,6 +2,7 @@ package st import ( "github.com/spf13/cobra" + "github.com/wal-g/wal-g/internal/multistorage" ) // Storage tools allows to interact with the configured storage, e.g.: @@ -20,4 +21,10 @@ var ( Long: "Storage tools allows to interact with the configured storage. " + "Be aware that this command can do potentially harmful operations and make sure that you know what you're doing.", } + targetStorage string ) + +func init() { + StorageToolsCmd.PersistentFlags().StringVarP(&targetStorage, "target", "t", multistorage.DefaultStorage, + "execute for specific failover storage (Postgres only)") +} diff --git a/cmd/pg/wal_fetch.go b/cmd/pg/wal_fetch.go index a700011bb..348734bb6 100644 --- a/cmd/pg/wal_fetch.go +++ b/cmd/pg/wal_fetch.go @@ -5,6 +5,7 @@ import ( "github.com/wal-g/tracelog" "github.com/wal-g/wal-g/internal" "github.com/wal-g/wal-g/internal/databases/postgres" + "github.com/wal-g/wal-g/internal/multistorage" ) const WalFetchShortDescription = "Fetches a WAL file from storage" @@ -17,7 +18,14 @@ var walFetchCmd = &cobra.Command{ Run: func(cmd *cobra.Command, args []string) { folder, err := internal.ConfigureFolder() tracelog.ErrorLogger.FatalOnError(err) - postgres.HandleWALFetch(internal.NewFolderReader(folder), args[0], args[1], true) + + failover, err := internal.InitFailoverStorages() + tracelog.ErrorLogger.FatalOnError(err) + + folderReader, err := multistorage.NewStorageFolderReader(folder, failover) + tracelog.ErrorLogger.FatalOnError(err) + + postgres.HandleWALFetch(folderReader, args[0], args[1], true) }, } diff --git a/cmd/pg/wal_prefetch.go b/cmd/pg/wal_prefetch.go index bdbe3e97b..7c36bfc8e 100644 --- a/cmd/pg/wal_prefetch.go +++ b/cmd/pg/wal_prefetch.go @@ -5,6 +5,7 @@ import ( "github.com/wal-g/tracelog" "github.com/wal-g/wal-g/internal" "github.com/wal-g/wal-g/internal/databases/postgres" + "github.com/wal-g/wal-g/internal/multistorage" ) const WalPrefetchShortDescription = `Used for prefetching process forking @@ -19,7 +20,14 @@ var WalPrefetchCmd = &cobra.Command{ Run: func(cmd *cobra.Command, args []string) { folder, err := internal.ConfigureFolder() tracelog.ErrorLogger.FatalOnError(err) - postgres.HandleWALPrefetch(internal.NewFolderReader(folder), args[0], args[1]) + + failover, err := internal.InitFailoverStorages() + tracelog.ErrorLogger.FatalOnError(err) + + folderReader, err := multistorage.NewStorageFolderReader(folder, failover) + tracelog.ErrorLogger.FatalOnError(err) + + postgres.HandleWALPrefetch(folderReader, args[0], args[1]) }, } diff --git a/cmd/pg/wal_push.go b/cmd/pg/wal_push.go index a3003f7bb..d13f066a6 100644 --- a/cmd/pg/wal_push.go +++ b/cmd/pg/wal_push.go @@ -6,6 +6,7 @@ import ( "github.com/wal-g/wal-g/internal" "github.com/wal-g/wal-g/internal/asm" "github.com/wal-g/wal-g/internal/databases/postgres" + "github.com/wal-g/wal-g/internal/multistorage" "github.com/wal-g/wal-g/utility" ) @@ -20,27 +21,33 @@ var walPushCmd = &cobra.Command{ baseUploader, err := internal.ConfigureUploader() tracelog.ErrorLogger.FatalOnError(err) - uploader, err := postgres.ConfigureWalUploader(baseUploader) + failover, err := internal.InitFailoverStorages() + tracelog.ErrorLogger.FatalOnError(err) + + uploader, err := multistorage.NewUploader(baseUploader, failover) + tracelog.ErrorLogger.FatalOnError(err) + + walUploader, err := postgres.ConfigureWalUploader(uploader) tracelog.ErrorLogger.FatalOnError(err) archiveStatusManager, err := internal.ConfigureArchiveStatusManager() if err == nil { - uploader.ArchiveStatusManager = asm.NewDataFolderASM(archiveStatusManager) + walUploader.ArchiveStatusManager = asm.NewDataFolderASM(archiveStatusManager) } else { tracelog.ErrorLogger.PrintError(err) - uploader.ArchiveStatusManager = asm.NewNopASM() + walUploader.ArchiveStatusManager = asm.NewNopASM() } PGArchiveStatusManager, err := internal.ConfigurePGArchiveStatusManager() if err == nil { - uploader.PGArchiveStatusManager = asm.NewDataFolderASM(PGArchiveStatusManager) + walUploader.PGArchiveStatusManager = asm.NewDataFolderASM(PGArchiveStatusManager) } else { tracelog.ErrorLogger.PrintError(err) - uploader.PGArchiveStatusManager = asm.NewNopASM() + walUploader.PGArchiveStatusManager = asm.NewNopASM() } - uploader.ChangeDirectory(utility.WalPath) - err = postgres.HandleWALPush(uploader, args[0]) + walUploader.ChangeDirectory(utility.WalPath) + err = postgres.HandleWALPush(walUploader, args[0]) tracelog.ErrorLogger.FatalOnError(err) }, } diff --git a/docker-compose.yml b/docker-compose.yml index 2b415b15b..ee3dd8f53 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -35,6 +35,7 @@ services: && mkdir -p /export/fullcopycomposerbucket && mkdir -p /export/fullwithoutfilesmetadatabucket && mkdir -p /export/fullscandeltabucket + && mkdir -p /export/failoverstoragebucket && mkdir -p /export/partialbucket && mkdir -p /export/remotebucket && mkdir -p /export/remotewithoutfilesmetadatabucket @@ -269,6 +270,17 @@ services: links: - s3 + pg_full_backup_failover_storage_test: + build: + dockerfile: docker/pg_tests/Dockerfile_full_backup_failover_storages_test + context: . + image: wal-g/full_backup_failover_storage_test + container_name: wal-g_pg_full_backup_failover_storage_test + depends_on: + - s3 + links: + - s3 + pg_full_backup_streamed_test: build: dockerfile: docker/pg_tests/Dockerfile_full_backup_streamed_test diff --git a/docker/pg_tests/Dockerfile_full_backup_failover_storages_test b/docker/pg_tests/Dockerfile_full_backup_failover_storages_test new file mode 100644 index 000000000..95e83ab86 --- /dev/null +++ b/docker/pg_tests/Dockerfile_full_backup_failover_storages_test @@ -0,0 +1,3 @@ +FROM wal-g/docker_prefix:latest + +CMD su postgres -c "/tmp/tests/full_backup_failover_storages_test.sh" diff --git a/docker/pg_tests/scripts/configs/full_backup_failover_storages_test_config.json b/docker/pg_tests/scripts/configs/full_backup_failover_storages_test_config.json new file mode 100755 index 000000000..c04a61450 --- /dev/null +++ b/docker/pg_tests/scripts/configs/full_backup_failover_storages_test_config.json @@ -0,0 +1,4 @@ +"WALE_S3_PREFIX": "s3://failoverstoragebucket", +"WALG_DELTA_MAX_STEPS": "6", +"WALG_PGP_KEY_PATH": "/tmp/PGP_KEY", +"WALG_LOG_LEVEL": "DEVEL" \ No newline at end of file diff --git a/docker/pg_tests/scripts/configs/full_backup_failover_storages_test_config_failover.json b/docker/pg_tests/scripts/configs/full_backup_failover_storages_test_config_failover.json new file mode 100755 index 000000000..a22179663 --- /dev/null +++ b/docker/pg_tests/scripts/configs/full_backup_failover_storages_test_config_failover.json @@ -0,0 +1,17 @@ +"WALE_S3_PREFIX": "s3://nonexistingbucket", +"WALG_DELTA_MAX_STEPS": "6", +"WALG_LOG_LEVEL": "DEVEL", +"WALG_PGP_KEY_PATH": "/tmp/PGP_KEY", +"WALG_FAILOVER_STORAGES": { + "not_working_failover": { + "AWS_SECRET_ACCESS_KEY": "useless_access_key", + "AWS_ACCESS_KEY_ID": "incorrect_access_key_id", + "WALE_S3_PREFIX": "s3://some-useless-s3-prefix" + }, + "good_failover": { + "AWS_SECRET_ACCESS_KEY": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", + "AWS_ACCESS_KEY_ID": "AKIAIOSFODNN7EXAMPLE", + "WALE_S3_PREFIX": "s3://failoverstoragebucket" + } +}, +"WALG_FAILOVER_STORAGES_CHECK_TIMEOUT": "5s" diff --git a/docker/pg_tests/scripts/tests/full_backup_failover_storages_test.sh b/docker/pg_tests/scripts/tests/full_backup_failover_storages_test.sh new file mode 100755 index 000000000..8b62f228e --- /dev/null +++ b/docker/pg_tests/scripts/tests/full_backup_failover_storages_test.sh @@ -0,0 +1,87 @@ +#!/bin/sh +set -e -x +CONFIG_FILE="/tmp/configs/full_backup_failover_storages_test_config.json" +ARCHIVE_CONFIG_FILE="/tmp/configs/full_backup_failover_storages_test_config_failover.json" +COMMON_CONFIG="/tmp/configs/common_config.json" +TMP_CONFIG="/tmp/configs/tmp_config.json" +ARCHIVE_TMP_CONFIG="/tmp/configs/archive_tmp_config.json" +cat ${CONFIG_FILE} > ${TMP_CONFIG} +echo "," >> ${TMP_CONFIG} +cat ${COMMON_CONFIG} >> ${TMP_CONFIG} +/tmp/scripts/wrap_config_file.sh ${TMP_CONFIG} + +cat ${ARCHIVE_CONFIG_FILE} > ${ARCHIVE_TMP_CONFIG} +echo "," >> ${ARCHIVE_TMP_CONFIG} +cat ${COMMON_CONFIG} >> ${ARCHIVE_TMP_CONFIG} +/tmp/scripts/wrap_config_file.sh ${ARCHIVE_TMP_CONFIG} + +/usr/lib/postgresql/10/bin/initdb ${PGDATA} + +echo "archive_mode = on" >> ${PGDATA}/postgresql.conf +echo "archive_command = '/usr/bin/timeout 600 wal-g --config=${ARCHIVE_TMP_CONFIG} wal-push %p'" >> ${PGDATA}/postgresql.conf +echo "archive_timeout = 600" >> ${PGDATA}/postgresql.conf + +/usr/lib/postgresql/10/bin/pg_ctl -D ${PGDATA} -w start + +/tmp/scripts/wait_while_pg_not_ready.sh + +wal-g --config=${TMP_CONFIG} delete everything FORCE --confirm + +pgbench -i -s 5 postgres +pg_dumpall -f /tmp/dump1 +pgbench -c 2 -T 100000000 -S & +sleep 1 +wal-g --config=${TMP_CONFIG} backup-push ${PGDATA} +/tmp/scripts/drop_pg.sh + +wal-g --config=${TMP_CONFIG} backup-fetch ${PGDATA} LATEST + +echo "restore_command = 'echo \"WAL file restoration: %f, %p\"&& wal-g --config=${ARCHIVE_TMP_CONFIG} wal-fetch \"%f\" \"%p\"'" > ${PGDATA}/recovery.conf + +wal-g --config=${ARCHIVE_TMP_CONFIG} st ls -r --target all + +/usr/lib/postgresql/10/bin/pg_ctl -D ${PGDATA} -w start +/tmp/scripts/wait_while_pg_not_ready.sh +pg_dumpall -f /tmp/dump2 + +diff /tmp/dump1 /tmp/dump2 + +psql -f /tmp/scripts/amcheck.sql -v "ON_ERROR_STOP=1" postgres + +echo "Full backup success!!!!!!" + +# Also we test here WAL overwrite prevention as a part of regular backup functionality +# First test that .history files prevent overwrite even if WALG_PREVENT_WAL_OVERWRITE is false + +export WALG_PREVENT_WAL_OVERWRITE=false + +echo test > ${PGDATA}/pg_wal/test_file.history +wal-g --config=${ARCHIVE_TMP_CONFIG} wal-push ${PGDATA}/pg_wal/test_file.history +wal-g --config=${ARCHIVE_TMP_CONFIG} wal-push ${PGDATA}/pg_wal/test_file.history + +echo test1 > ${PGDATA}/pg_wal/test_file.history +wal-g --config=${ARCHIVE_TMP_CONFIG} wal-push ${PGDATA}/pg_wal/test_file && EXIT_STATUS=$? || EXIT_STATUS=$? + +if [ "$EXIT_STATUS" -eq 0 ] ; then + echo "Error: Duplicate .history with different content was pushed" + exit 1 +fi + +export WALG_PREVENT_WAL_OVERWRITE=true + +echo test > ${PGDATA}/pg_wal/test_file +wal-g --config=${ARCHIVE_TMP_CONFIG} wal-push ${PGDATA}/pg_wal/test_file +wal-g --config=${ARCHIVE_TMP_CONFIG} wal-push ${PGDATA}/pg_wal/test_file + +echo test1 > ${PGDATA}/pg_wal/test_file +wal-g --config=${ARCHIVE_TMP_CONFIG} wal-push ${PGDATA}/pg_wal/test_file && EXIT_STATUS=$? || EXIT_STATUS=$? + +if [ "$EXIT_STATUS" -eq 0 ] ; then + echo "Error: Duplicate WAL with different content was pushed" + exit 1 +fi + +/tmp/scripts/drop_pg.sh +rm ${TMP_CONFIG} + +echo "Prevent WAL overwrite success!!!!!!" diff --git a/docker/pg_tests/scripts/tests/test_functions/remote_backup_and_restore_test.sh b/docker/pg_tests/scripts/tests/test_functions/remote_backup_and_restore_test.sh index 4a2db18dd..25f8306f7 100755 --- a/docker/pg_tests/scripts/tests/test_functions/remote_backup_and_restore_test.sh +++ b/docker/pg_tests/scripts/tests/test_functions/remote_backup_and_restore_test.sh @@ -54,6 +54,9 @@ remote_backup_and_restore_test() { rm -rf "${PGTBS}"/* rm -rf "${PGDATA}" + echo Debug + wal-g --config=${TMP_CONFIG} st ls -r + echo Restore destination BACKUP=$(wal-g --config=${TMP_CONFIG} backup-list | sed -n '2{s/ .*//;p}') wal-g --config=${TMP_CONFIG} backup-fetch "$PGDATA" "$BACKUP" diff --git a/docs/PostgreSQL.md b/docs/PostgreSQL.md index 41e512da0..b0622ad31 100644 --- a/docs/PostgreSQL.md +++ b/docs/PostgreSQL.md @@ -579,3 +579,31 @@ Usage: ```bash wal-g pgbackrest wal-show ``` + +Failover archive storages (experimental) +----------- + +Switch to a failover storage for `wal-push` if primary storage becomes unavailable. This might be useful when the archiving fails during the cloud storage service unavailability to avoid out-of-disk-space issues. +WAL-G will also take the failover storages into account during the `wal-fetch/wal-prefetch`. + +```bash +WALG_FAILOVER_STORAGES: + TEST_STORAGE: + AWS_SECRET_ACCESS_KEY: "S3_STORAGE_KEY_1" + AWS_ACCESS_KEY_ID: "S3_STORAGE_KEY_ID_1" + WALE_S3_PREFIX: "s3://some-s3-storage-1/" + STORAGE2: + AWS_SECRET_ACCESS_KEY: "S3_STORAGE_KEY_2" + AWS_ACCESS_KEY_ID: "S3_STORAGE_KEY_ID_2" + WALE_S3_PREFIX: "s3://some-s3-storage-2/" + FILE_STORAGE: + WALG_FILE_PREFIX: "/some/prefix" +``` + +* `WALG_FAILOVER_STORAGES_CHECK_TIMEOUT` + +WAL-G will use no more than seconds to check for available alive storages. Default value is `30s`. + +* `WALG_FAILOVER_STORAGES_CACHE_LIFETIME` + +WAL-G saves information about last used alive storage to disk to avoid excessive storage calls. This setting controls lifetime of this cache. Default value is `15m`. diff --git a/internal/config.go b/internal/config.go index c3973e308..58cfa22e9 100644 --- a/internal/config.go +++ b/internal/config.go @@ -9,6 +9,8 @@ import ( "sort" "strings" + "github.com/wal-g/wal-g/internal/limiters" + "github.com/pkg/errors" "github.com/spf13/cobra" "github.com/spf13/pflag" @@ -27,67 +29,70 @@ const ( MONGO = "MONGO" GP = "GP" - DownloadConcurrencySetting = "WALG_DOWNLOAD_CONCURRENCY" - UploadConcurrencySetting = "WALG_UPLOAD_CONCURRENCY" - UploadDiskConcurrencySetting = "WALG_UPLOAD_DISK_CONCURRENCY" - UploadQueueSetting = "WALG_UPLOAD_QUEUE" - SentinelUserDataSetting = "WALG_SENTINEL_USER_DATA" - PreventWalOverwriteSetting = "WALG_PREVENT_WAL_OVERWRITE" - UploadWalMetadata = "WALG_UPLOAD_WAL_METADATA" - DeltaMaxStepsSetting = "WALG_DELTA_MAX_STEPS" - DeltaOriginSetting = "WALG_DELTA_ORIGIN" - CompressionMethodSetting = "WALG_COMPRESSION_METHOD" - StoragePrefixSetting = "WALG_STORAGE_PREFIX" - DiskRateLimitSetting = "WALG_DISK_RATE_LIMIT" - NetworkRateLimitSetting = "WALG_NETWORK_RATE_LIMIT" - UseWalDeltaSetting = "WALG_USE_WAL_DELTA" - UseReverseUnpackSetting = "WALG_USE_REVERSE_UNPACK" - SkipRedundantTarsSetting = "WALG_SKIP_REDUNDANT_TARS" - VerifyPageChecksumsSetting = "WALG_VERIFY_PAGE_CHECKSUMS" - StoreAllCorruptBlocksSetting = "WALG_STORE_ALL_CORRUPT_BLOCKS" - UseRatingComposerSetting = "WALG_USE_RATING_COMPOSER" - UseCopyComposerSetting = "WALG_USE_COPY_COMPOSER" - UseDatabaseComposerSetting = "WALG_USE_DATABASE_COMPOSER" - WithoutFilesMetadataSetting = "WALG_WITHOUT_FILES_METADATA" - DeltaFromNameSetting = "WALG_DELTA_FROM_NAME" - DeltaFromUserDataSetting = "WALG_DELTA_FROM_USER_DATA" - FetchTargetUserDataSetting = "WALG_FETCH_TARGET_USER_DATA" - LogLevelSetting = "WALG_LOG_LEVEL" - TarSizeThresholdSetting = "WALG_TAR_SIZE_THRESHOLD" - TarDisableFsyncSetting = "WALG_TAR_DISABLE_FSYNC" - CseKmsIDSetting = "WALG_CSE_KMS_ID" - CseKmsRegionSetting = "WALG_CSE_KMS_REGION" - LibsodiumKeySetting = "WALG_LIBSODIUM_KEY" - LibsodiumKeyPathSetting = "WALG_LIBSODIUM_KEY_PATH" - LibsodiumKeyTransform = "WALG_LIBSODIUM_KEY_TRANSFORM" - GpgKeyIDSetting = "GPG_KEY_ID" - PgpKeySetting = "WALG_PGP_KEY" - PgpKeyPathSetting = "WALG_PGP_KEY_PATH" - PgpKeyPassphraseSetting = "WALG_PGP_KEY_PASSPHRASE" - PgDataSetting = "PGDATA" - UserSetting = "USER" // TODO : do something with it - PgPortSetting = "PGPORT" - PgUserSetting = "PGUSER" - PgHostSetting = "PGHOST" - PgPasswordSetting = "PGPASSWORD" - PgPassfileSetting = "PGPASSFILE" - PgDatabaseSetting = "PGDATABASE" - PgSslModeSetting = "PGSSLMODE" - PgSlotName = "WALG_SLOTNAME" - PgWalSize = "WALG_PG_WAL_SIZE" - TotalBgUploadedLimit = "TOTAL_BG_UPLOADED_LIMIT" - NameStreamCreateCmd = "WALG_STREAM_CREATE_COMMAND" - NameStreamRestoreCmd = "WALG_STREAM_RESTORE_COMMAND" - MaxDelayedSegmentsCount = "WALG_INTEGRITY_MAX_DELAYED_WALS" - PrefetchDir = "WALG_PREFETCH_DIR" - PgReadyRename = "PG_READY_RENAME" - SerializerTypeSetting = "WALG_SERIALIZER_TYPE" - StreamSplitterPartitions = "WALG_STREAM_SPLITTER_PARTITIONS" - StreamSplitterBlockSize = "WALG_STREAM_SPLITTER_BLOCK_SIZE" - StreamSplitterMaxFileSize = "WALG_STREAM_SPLITTER_MAX_FILE_SIZE" - StatsdAddressSetting = "WALG_STATSD_ADDRESS" - PgAliveCheckInterval = "WALG_ALIVE_CHECK_INTERVAL" - PgStopBackupTimeout = "WALG_STOP_BACKUP_TIMEOUT" + DownloadConcurrencySetting = "WALG_DOWNLOAD_CONCURRENCY" + UploadConcurrencySetting = "WALG_UPLOAD_CONCURRENCY" + UploadDiskConcurrencySetting = "WALG_UPLOAD_DISK_CONCURRENCY" + UploadQueueSetting = "WALG_UPLOAD_QUEUE" + SentinelUserDataSetting = "WALG_SENTINEL_USER_DATA" + PreventWalOverwriteSetting = "WALG_PREVENT_WAL_OVERWRITE" + UploadWalMetadata = "WALG_UPLOAD_WAL_METADATA" + DeltaMaxStepsSetting = "WALG_DELTA_MAX_STEPS" + DeltaOriginSetting = "WALG_DELTA_ORIGIN" + CompressionMethodSetting = "WALG_COMPRESSION_METHOD" + StoragePrefixSetting = "WALG_STORAGE_PREFIX" + DiskRateLimitSetting = "WALG_DISK_RATE_LIMIT" + NetworkRateLimitSetting = "WALG_NETWORK_RATE_LIMIT" + UseWalDeltaSetting = "WALG_USE_WAL_DELTA" + UseReverseUnpackSetting = "WALG_USE_REVERSE_UNPACK" + SkipRedundantTarsSetting = "WALG_SKIP_REDUNDANT_TARS" + VerifyPageChecksumsSetting = "WALG_VERIFY_PAGE_CHECKSUMS" + StoreAllCorruptBlocksSetting = "WALG_STORE_ALL_CORRUPT_BLOCKS" + UseRatingComposerSetting = "WALG_USE_RATING_COMPOSER" + UseCopyComposerSetting = "WALG_USE_COPY_COMPOSER" + UseDatabaseComposerSetting = "WALG_USE_DATABASE_COMPOSER" + WithoutFilesMetadataSetting = "WALG_WITHOUT_FILES_METADATA" + DeltaFromNameSetting = "WALG_DELTA_FROM_NAME" + DeltaFromUserDataSetting = "WALG_DELTA_FROM_USER_DATA" + FetchTargetUserDataSetting = "WALG_FETCH_TARGET_USER_DATA" + LogLevelSetting = "WALG_LOG_LEVEL" + TarSizeThresholdSetting = "WALG_TAR_SIZE_THRESHOLD" + TarDisableFsyncSetting = "WALG_TAR_DISABLE_FSYNC" + CseKmsIDSetting = "WALG_CSE_KMS_ID" + CseKmsRegionSetting = "WALG_CSE_KMS_REGION" + LibsodiumKeySetting = "WALG_LIBSODIUM_KEY" + LibsodiumKeyPathSetting = "WALG_LIBSODIUM_KEY_PATH" + LibsodiumKeyTransform = "WALG_LIBSODIUM_KEY_TRANSFORM" + GpgKeyIDSetting = "GPG_KEY_ID" + PgpKeySetting = "WALG_PGP_KEY" + PgpKeyPathSetting = "WALG_PGP_KEY_PATH" + PgpKeyPassphraseSetting = "WALG_PGP_KEY_PASSPHRASE" + PgDataSetting = "PGDATA" + UserSetting = "USER" // TODO : do something with it + PgPortSetting = "PGPORT" + PgUserSetting = "PGUSER" + PgHostSetting = "PGHOST" + PgPasswordSetting = "PGPASSWORD" + PgPassfileSetting = "PGPASSFILE" + PgDatabaseSetting = "PGDATABASE" + PgSslModeSetting = "PGSSLMODE" + PgSlotName = "WALG_SLOTNAME" + PgWalSize = "WALG_PG_WAL_SIZE" + TotalBgUploadedLimit = "TOTAL_BG_UPLOADED_LIMIT" + NameStreamCreateCmd = "WALG_STREAM_CREATE_COMMAND" + NameStreamRestoreCmd = "WALG_STREAM_RESTORE_COMMAND" + MaxDelayedSegmentsCount = "WALG_INTEGRITY_MAX_DELAYED_WALS" + PrefetchDir = "WALG_PREFETCH_DIR" + PgReadyRename = "PG_READY_RENAME" + SerializerTypeSetting = "WALG_SERIALIZER_TYPE" + StreamSplitterPartitions = "WALG_STREAM_SPLITTER_PARTITIONS" + StreamSplitterBlockSize = "WALG_STREAM_SPLITTER_BLOCK_SIZE" + StreamSplitterMaxFileSize = "WALG_STREAM_SPLITTER_MAX_FILE_SIZE" + StatsdAddressSetting = "WALG_STATSD_ADDRESS" + PgAliveCheckInterval = "WALG_ALIVE_CHECK_INTERVAL" + PgStopBackupTimeout = "WALG_STOP_BACKUP_TIMEOUT" + PgFailoverStorages = "WALG_FAILOVER_STORAGES" + PgFailoverStoragesCheckTimeout = "WALG_FAILOVER_STORAGES_CHECK_TIMEOUT" + PgFailoverStorageCacheLifetime = "WALG_FAILOVER_STORAGES_CACHE_LIFETIME" ProfileSamplingRatio = "PROFILE_SAMPLING_RATIO" ProfileMode = "PROFILE_MODE" @@ -186,29 +191,31 @@ var ( defaultConfigValues map[string]string commonDefaultConfigValues = map[string]string{ - DownloadConcurrencySetting: "10", - UploadConcurrencySetting: "16", - UploadDiskConcurrencySetting: "1", - UploadQueueSetting: "2", - PreventWalOverwriteSetting: "false", - UploadWalMetadata: "NOMETADATA", - DeltaMaxStepsSetting: "0", - CompressionMethodSetting: "lz4", - UseWalDeltaSetting: "false", - TarSizeThresholdSetting: "1073741823", // (1 << 30) - 1 - TarDisableFsyncSetting: "false", - TotalBgUploadedLimit: "32", - UseReverseUnpackSetting: "false", - SkipRedundantTarsSetting: "false", - VerifyPageChecksumsSetting: "false", - StoreAllCorruptBlocksSetting: "false", - UseRatingComposerSetting: "false", - UseCopyComposerSetting: "false", - UseDatabaseComposerSetting: "false", - WithoutFilesMetadataSetting: "false", - MaxDelayedSegmentsCount: "0", - SerializerTypeSetting: "json_default", - LibsodiumKeyTransform: "none", + DownloadConcurrencySetting: "10", + UploadConcurrencySetting: "16", + UploadDiskConcurrencySetting: "1", + UploadQueueSetting: "2", + PreventWalOverwriteSetting: "false", + UploadWalMetadata: "NOMETADATA", + DeltaMaxStepsSetting: "0", + CompressionMethodSetting: "lz4", + UseWalDeltaSetting: "false", + TarSizeThresholdSetting: "1073741823", // (1 << 30) - 1 + TarDisableFsyncSetting: "false", + TotalBgUploadedLimit: "32", + UseReverseUnpackSetting: "false", + SkipRedundantTarsSetting: "false", + VerifyPageChecksumsSetting: "false", + StoreAllCorruptBlocksSetting: "false", + UseRatingComposerSetting: "false", + UseCopyComposerSetting: "false", + UseDatabaseComposerSetting: "false", + WithoutFilesMetadataSetting: "false", + MaxDelayedSegmentsCount: "0", + SerializerTypeSetting: "json_default", + LibsodiumKeyTransform: "none", + PgFailoverStoragesCheckTimeout: "30s", + PgFailoverStorageCacheLifetime: "15m", } MongoDefaultSettings = map[string]string{ @@ -377,21 +384,24 @@ var ( PGAllowedSettings = map[string]bool{ // Postgres - PgPortSetting: true, - PgUserSetting: true, - PgHostSetting: true, - PgDataSetting: true, - PgPasswordSetting: true, - PgPassfileSetting: true, - PgDatabaseSetting: true, - PgSslModeSetting: true, - PgSlotName: true, - PgWalSize: true, - PrefetchDir: true, - PgReadyRename: true, - PgBackRestStanza: true, - PgAliveCheckInterval: true, - PgStopBackupTimeout: true, + PgPortSetting: true, + PgUserSetting: true, + PgHostSetting: true, + PgDataSetting: true, + PgPasswordSetting: true, + PgPassfileSetting: true, + PgDatabaseSetting: true, + PgSslModeSetting: true, + PgSlotName: true, + PgWalSize: true, + PrefetchDir: true, + PgReadyRename: true, + PgBackRestStanza: true, + PgAliveCheckInterval: true, + PgStopBackupTimeout: true, + PgFailoverStorages: true, + PgFailoverStoragesCheckTimeout: true, + PgFailoverStorageCacheLifetime: true, } MongoAllowedSettings = map[string]bool{ @@ -485,6 +495,10 @@ var ( SSHPassword: true, SwiftOsPassword: true, } + + complexSettings = map[string]bool{ + PgFailoverStorages: true, + } ) func AddTurboFlag(cmd *cobra.Command) { @@ -809,6 +823,10 @@ func bindConfigToEnv(globalViper *viper.Viper) { continue } + if complexSettings[k] { + continue + } + err := os.Setenv(k, val) if err != nil { err = errors.Wrap(err, "Failed to bind config to env variable") @@ -816,3 +834,31 @@ func bindConfigToEnv(globalViper *viper.Viper) { } } } + +func InitFailoverStorages() (res map[string]storage.Folder, err error) { + storages := viper.GetStringMap(PgFailoverStorages) + + if len(storages) == 0 { + return nil, nil + } + + res = make(map[string]storage.Folder, 0) + for name := range storages { + if name == "default" { + return nil, fmt.Errorf("'%s' storage name is reserved", name) + } + cfg := viper.Sub(PgFailoverStorages + "." + name) + folder, err := ConfigureFolderForSpecificConfig(cfg) + if err != nil { + return nil, fmt.Errorf("failover storage %s: %v", name, err) + } + if limiters.NetworkLimiter != nil { + folder = NewLimitedFolder(folder, limiters.NetworkLimiter) + } + + folder = ConfigureStoragePrefix(folder) + res[name] = folder + } + + return res, nil +} diff --git a/internal/configure.go b/internal/configure.go index 9f3eeefb5..078a9fc20 100644 --- a/internal/configure.go +++ b/internal/configure.go @@ -238,7 +238,7 @@ func ConfigurePGArchiveStatusManager() (fsutil.DataFolder, error) { // ConfigureUploader connects to storage and creates an uploader. It makes sure // that a valid session has started; if invalid, returns AWS error // and `` values. -func ConfigureUploader() (uploader Uploader, err error) { +func ConfigureUploader() (uploader *RegularUploader, err error) { folder, err := ConfigureFolder() if err != nil { return nil, errors.Wrap(err, "failed to configure folder") @@ -249,7 +249,7 @@ func ConfigureUploader() (uploader Uploader, err error) { return nil, errors.Wrap(err, "failed to configure compression") } - uploader = NewUploader(compressor, folder) + uploader = NewRegularUploader(compressor, folder) return uploader, err } @@ -259,7 +259,7 @@ func ConfigureUploaderWithoutCompressor() (uploader Uploader, err error) { return nil, errors.Wrap(err, "failed to configure folder") } - uploader = NewUploader(nil, folder) + uploader = NewRegularUploader(nil, folder) return uploader, err } diff --git a/internal/databases/mongo/archive/loader_test.go b/internal/databases/mongo/archive/loader_test.go index 1f44515c9..0a4a2d3bc 100644 --- a/internal/databases/mongo/archive/loader_test.go +++ b/internal/databases/mongo/archive/loader_test.go @@ -31,7 +31,7 @@ func TestStorageUploader_UploadOplogArchive_ProperInterfaces(t *testing.T) { return nil }) - uploaderProv := internal.NewUploader(compression.Compressors[lz4.AlgorithmName], storageProv) + uploaderProv := internal.NewRegularUploader(compression.Compressors[lz4.AlgorithmName], storageProv) su := NewStorageUploader(uploaderProv) r, w := io.Pipe() go func() { diff --git a/internal/databases/sqlserver/backup_export_handler.go b/internal/databases/sqlserver/backup_export_handler.go index 8dd8ec311..082eb3f28 100644 --- a/internal/databases/sqlserver/backup_export_handler.go +++ b/internal/databases/sqlserver/backup_export_handler.go @@ -60,7 +60,7 @@ func HandleBackupExport(externalConfig string, exportPrefixes map[string]string) tracelog.ErrorLogger.FatalfOnError("overall export failed: %v", err) sentinel.Databases = uniq(append(sentinel.Databases, dbnames...)) - uploader := internal.NewUploader(nil, folder.GetSubFolder(utility.BaseBackupPath)) + uploader := internal.NewRegularUploader(nil, folder.GetSubFolder(utility.BaseBackupPath)) tracelog.InfoLogger.Printf("uploading sentinel: %s", sentinel) err = internal.UploadSentinel(uploader, sentinel, backupName) tracelog.ErrorLogger.FatalfOnError("failed to save sentinel: %v", err) diff --git a/internal/databases/sqlserver/backup_import_handler.go b/internal/databases/sqlserver/backup_import_handler.go index 5d2a51364..e1598f0e0 100644 --- a/internal/databases/sqlserver/backup_import_handler.go +++ b/internal/databases/sqlserver/backup_import_handler.go @@ -95,7 +95,7 @@ func HandleBackupImport(externalConfig string, importDatabases map[string]string tracelog.ErrorLogger.FatalfOnError("overall import failed: %v", err) sentinel.Databases = uniq(append(sentinel.Databases, dbnames...)) - uploader := internal.NewUploader(nil, folder.GetSubFolder(utility.BaseBackupPath)) + uploader := internal.NewRegularUploader(nil, folder.GetSubFolder(utility.BaseBackupPath)) tracelog.InfoLogger.Printf("uploading sentinel: %s", sentinel) err = internal.UploadSentinel(uploader, sentinel, backupName) tracelog.ErrorLogger.FatalfOnError("failed to save sentinel: %v", err) diff --git a/internal/databases/sqlserver/backup_push_handler.go b/internal/databases/sqlserver/backup_push_handler.go index f935ca211..a7486de6d 100644 --- a/internal/databases/sqlserver/backup_push_handler.go +++ b/internal/databases/sqlserver/backup_push_handler.go @@ -63,7 +63,7 @@ func HandleBackupPush(dbnames []string, updateLatest bool) { if !updateLatest { sentinel.StopLocalTime = utility.TimeNowCrossPlatformLocal() } - uploader := internal.NewUploader(nil, folder.GetSubFolder(utility.BaseBackupPath)) + uploader := internal.NewRegularUploader(nil, folder.GetSubFolder(utility.BaseBackupPath)) tracelog.InfoLogger.Printf("uploading sentinel: %s", sentinel) err = internal.UploadSentinel(uploader, sentinel, backupName) tracelog.ErrorLogger.FatalfOnError("failed to save sentinel: %v", err) diff --git a/internal/multistorage/alive_checker.go b/internal/multistorage/alive_checker.go new file mode 100644 index 000000000..8efbf6b68 --- /dev/null +++ b/internal/multistorage/alive_checker.go @@ -0,0 +1,74 @@ +package multistorage + +import ( + "context" + "fmt" + + "github.com/wal-g/tracelog" + "github.com/wal-g/wal-g/internal" +) + +// TODO: Unit tests +func FindAliveStorages(toCheck []FailoverFolder, stopOnDefaultOk bool) (ok []FailoverFolder, err error) { + checkTimeout, err := internal.GetDurationSetting(internal.PgFailoverStoragesCheckTimeout) + if err != nil { + return nil, fmt.Errorf("check timeout setting: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), checkTimeout) + defer cancel() + + okFolderCh := make(chan FailoverFolder, len(toCheck)) + errCh := make(chan error, len(toCheck)) + + for idx := range toCheck { + i := idx + go func() { + err := checkStorageAlive(ctx, toCheck[i]) + if err != nil { + errCh <- fmt.Errorf("storage '%s' read check: %v", toCheck[i].Name, err) + return + } + + if toCheck[i].Name == DefaultStorage && stopOnDefaultOk { + // stop checking other storages if default is OK + cancel() + } + + okFolderCh <- toCheck[i] + }() + } + + checkedCount := 0 + for checkedCount < len(toCheck) { + select { + case okFolder := <-okFolderCh: + ok = append(ok, okFolder) + case err := <-errCh: + tracelog.ErrorLogger.Print(err) + } + checkedCount++ + } + + if len(ok) == 0 { + return nil, fmt.Errorf("no readable storages found, all %d failed", checkedCount) + } + + return ok, nil +} + +func checkStorageAlive(ctx context.Context, folder FailoverFolder) error { + // currently, we use simple ListFolder() call to check if storage is up and reachable + errCh := make(chan error, 1) + go func() { + _, _, err := folder.ListFolder() + errCh <- err + }() + + select { + case err := <-errCh: + return err + case <-ctx.Done(): + return fmt.Errorf("storage '%s' read check timeout", folder.Name) + } +} diff --git a/internal/multistorage/alive_storage_cache.go b/internal/multistorage/alive_storage_cache.go new file mode 100644 index 000000000..0fd49b7a5 --- /dev/null +++ b/internal/multistorage/alive_storage_cache.go @@ -0,0 +1,123 @@ +package multistorage + +import ( + "encoding/json" + "fmt" + "os" + "os/user" + "path/filepath" + "sync" + "time" + + "github.com/wal-g/wal-g/internal" + + "github.com/wal-g/tracelog" +) + +var storageCacheMu = &sync.RWMutex{} + +type aliveStorageCache struct { + LastGoodStorage string `json:"last_ok_storage"` + UpdTS time.Time `json:"upd_ts"` + Found bool `json:"-"` + Lifetime time.Duration `json:"-"` +} + +const cacheFileName = ".walg_failover_storage_cache" + +var storageCache *aliveStorageCache + +func initStorageCache() error { + cacheLifetime, err := internal.GetDurationSetting(internal.PgFailoverStorageCacheLifetime) + if err != nil { + return fmt.Errorf("cache lifetime setting: %v", err) + } + + if storageCache == nil { + storageCacheMu.Lock() + defer storageCacheMu.Unlock() + if storageCache == nil { + cache, err := readState() + if err != nil { + tracelog.DebugLogger.Printf("Reading storage cache: %v", err) + // ignore the error and continue + } else { + cache.Found = true + } + cache.Lifetime = cacheLifetime + storageCache = &cache + } + } + return nil +} + +func (c *aliveStorageCache) IsActual() bool { + storageCacheMu.RLock() + defer storageCacheMu.RUnlock() + + return c.Found && time.Now().Before(c.UpdTS.Add(c.Lifetime)) +} + +func (c *aliveStorageCache) Copy() aliveStorageCache { + storageCacheMu.RLock() + defer storageCacheMu.RUnlock() + + return *c +} + +func (c *aliveStorageCache) Update(storage string) { + if storageCache.IsActual() { + // too early to update + return + } + storageCacheMu.Lock() + defer storageCacheMu.Unlock() + + storageCache.LastGoodStorage = storage + storageCache.UpdTS = time.Now() + + err := storageCache.writeState() + if err != nil { + tracelog.DebugLogger.Printf("Writing storage cache: %v", err) + } +} + +func (c *aliveStorageCache) writeState() error { + usr, err := user.Current() + + if err != nil { + return err + } + + tracelog.DebugLogger.Printf("Writing storage cache: %v", c) + cacheFilename := filepath.Join(usr.HomeDir, cacheFileName) + marshal, err := json.Marshal(c) + if err != nil { + return err + } + + return os.WriteFile(cacheFilename, marshal, 0644) +} + +func readState() (aliveStorageCache, error) { + var cache aliveStorageCache + + usr, err := user.Current() + if err != nil { + return aliveStorageCache{}, err + } + + cacheFilename := filepath.Join(usr.HomeDir, cacheFileName) + file, err := os.ReadFile(cacheFilename) + if err != nil { + return aliveStorageCache{}, err + } + + err = json.Unmarshal(file, &cache) + if err != nil { + return aliveStorageCache{}, err + } + + tracelog.DebugLogger.Printf("Reading storage cache: %v", cache) + return cache, nil +} diff --git a/internal/multistorage/executer.go b/internal/multistorage/executer.go new file mode 100644 index 000000000..bc841362d --- /dev/null +++ b/internal/multistorage/executer.go @@ -0,0 +1,72 @@ +package multistorage + +import ( + "fmt" + "strings" + + "github.com/wal-g/tracelog" + "github.com/wal-g/wal-g/internal" + "github.com/wal-g/wal-g/pkg/storages/storage" +) + +func executeOnAllStorages(fn func(folder storage.Folder) error) error { + failover, err := internal.InitFailoverStorages() + if err != nil { + return err + } + + folder, err := internal.ConfigureFolder() + if err != nil { + return err + } + toRun := NewFailoverFolders(folder, failover) + + atLeastOneOK := false + for _, f := range toRun { + tracelog.InfoLogger.Printf("storage %s", f.Name) + err := fn(f) + tracelog.ErrorLogger.PrintOnError(err) + if err == nil { + atLeastOneOK = true + } + } + + if !atLeastOneOK { + return fmt.Errorf("all storages failed") + } + + return nil +} + +func ExecuteOnStorage(target string, fn func(folder storage.Folder) error) error { + if target == DefaultStorage { + folder, err := internal.ConfigureFolder() + if err != nil { + return err + } + + return fn(folder) + } + + if target == "all" { + return executeOnAllStorages(fn) + } + + failover, err := internal.InitFailoverStorages() + if err != nil { + return err + } + + for name := range failover { + if target != name { + continue + } + return fn(failover[name]) + } + + available := []string{DefaultStorage} + for name := range failover { + available = append(available, name) + } + return fmt.Errorf("target storage '%s' not found, available storages: %v", target, strings.Join(available, ", ")) +} diff --git a/internal/multistorage/failover_folder.go b/internal/multistorage/failover_folder.go new file mode 100644 index 000000000..e1181c1d9 --- /dev/null +++ b/internal/multistorage/failover_folder.go @@ -0,0 +1,41 @@ +package multistorage + +import ( + "sort" + + "github.com/wal-g/wal-g/pkg/storages/storage" +) + +const DefaultStorage = "default" + +type FailoverFolder struct { + storage.Folder + Name string +} + +func NewDefaultFailoverFolder(folder storage.Folder) FailoverFolder { + return FailoverFolder{ + Folder: folder, + Name: DefaultStorage, + } +} + +func NewFailoverFolders(base storage.Folder, failovers map[string]storage.Folder) (storages []FailoverFolder) { + storages = append(storages, FailoverFolder{ + Folder: base, + Name: DefaultStorage, + }) + + for name, folder := range failovers { + storages = append(storages, FailoverFolder{ + Folder: folder, + Name: name, + }) + } + + sort.Slice(storages, func(i, j int) bool { + return storages[i].Name == DefaultStorage || storages[i].Name < storages[j].Name + }) + + return storages +} diff --git a/internal/multistorage/folder_reader.go b/internal/multistorage/folder_reader.go new file mode 100644 index 000000000..4fa0b68af --- /dev/null +++ b/internal/multistorage/folder_reader.go @@ -0,0 +1,132 @@ +package multistorage + +import ( + "fmt" + "io" + "sync" + + "github.com/wal-g/tracelog" + "github.com/wal-g/wal-g/internal" + "github.com/wal-g/wal-g/pkg/storages/storage" +) + +func NewStorageFolderReader(mainFolder storage.Folder, failover map[string]storage.Folder) (internal.StorageFolderReader, error) { + if len(failover) == 0 { + return internal.NewFolderReader(mainFolder), nil + } + + err := initStorageCache() + if err != nil { + return nil, err + } + + failoverFolders := NewFailoverFolders(mainFolder, failover) + + folder, ok, err := FindCachedStorage(failoverFolders) + if err != nil { + return nil, err + } + + if !ok { + // if no cached, use default + folder = NewDefaultFailoverFolder(folder) + } + + leftover := make([]FailoverFolder, 0) + for i := range failoverFolders { + if failoverFolders[i].Name != folder.Name { + leftover = append(leftover, failoverFolders[i]) + } + } + + return &StorageFolderReader{ + main: folder, + failover: leftover, + }, nil +} + +type StorageFolderReader struct { + main FailoverFolder + + failover []FailoverFolder + aliveSearchComplete bool + mu sync.Mutex +} + +func (sfr *StorageFolderReader) ReadObject(objectRelativePath string) (io.ReadCloser, error) { + // at first, try to read from the default storage, + // if failed, check for any alive failover storages + + readCloser, originalErr := sfr.main.ReadObject(objectRelativePath) + if originalErr == nil { + return readCloser, nil + } + + err := sfr.initFailoverStorages() + if err != nil { + tracelog.ErrorLogger.Printf("failover storages init failed: %v", err) + return nil, originalErr + } + + errors := []error{originalErr} + for i := range sfr.failover { + readCloser, err := sfr.failover[i].ReadObject(objectRelativePath) + if err == nil { + storageCache.Update(sfr.failover[i].Name) + tracelog.WarningLogger.Printf("will read '%s' from failover storage '%s'", + objectRelativePath, sfr.failover[i].Name) + return readCloser, nil + } + errors = append(errors, err) + + tracelog.DebugLogger.Printf("failover storage '%s', reading object '%s': %v", + sfr.failover[i].Name, objectRelativePath, err) + } + + for i := range errors { + if _, ok := errors[i].(storage.ObjectNotFoundError); !ok { + // if we have at least one error that differs from the regular not found one + // we must return a custom error since we can't say for sure if object exists or not + return nil, fmt.Errorf("object %s unavailable, tried reading from %d storages: %v", + objectRelativePath, len(errors), errors) + } + } + + return nil, storage.NewObjectNotFoundError(objectRelativePath) +} + +func (sfr *StorageFolderReader) initFailoverStorages() error { + sfr.mu.Lock() + defer sfr.mu.Unlock() + + if sfr.aliveSearchComplete { + return nil + } + + aliveFolders, err := FindAliveStorages(sfr.failover, false) + if err != nil { + return err + } + + sfr.failover = aliveFolders + sfr.aliveSearchComplete = true + return nil +} + +func (sfr *StorageFolderReader) SubFolder(subFolderRelativePath string) internal.StorageFolderReader { + sfr.mu.Lock() + defer sfr.mu.Unlock() + var failover []FailoverFolder + + for i := range sfr.failover { + failover = append(failover, FailoverFolder{ + Folder: sfr.failover[i].GetSubFolder(subFolderRelativePath), + Name: sfr.failover[i].Name, + }) + } + + return &StorageFolderReader{ + main: FailoverFolder{sfr.main.GetSubFolder(subFolderRelativePath), sfr.main.Name}, + failover: failover, + } +} diff --git a/internal/multistorage/uploader.go b/internal/multistorage/uploader.go new file mode 100644 index 000000000..38f6c7416 --- /dev/null +++ b/internal/multistorage/uploader.go @@ -0,0 +1,62 @@ +package multistorage + +import ( + "github.com/wal-g/tracelog" + "github.com/wal-g/wal-g/internal" + "github.com/wal-g/wal-g/pkg/storages/storage" +) + +func NewUploader(baseUploader *internal.RegularUploader, failover map[string]storage.Folder) (internal.Uploader, error) { + if len(failover) == 0 { + return baseUploader, nil + } + + failoverFolders := NewFailoverFolders(baseUploader.UploadingFolder, failover) + + err := initStorageCache() + if err != nil { + return nil, err + } + + folder, ok, err := FindCachedStorage(failoverFolders) + if err != nil { + return nil, err + } + + if !ok { + tracelog.DebugLogger.Printf("Cached upload storage not found, will search for an available one") + folder, err = chooseAliveUploadStorage(failoverFolders) + if err != nil { + return nil, err + } + } + + storageCache.Update(folder.Name) + baseUploader.UploadingFolder = folder + tracelog.DebugLogger.Printf("Active uploader is '%s'", folder.Name) + return baseUploader, nil +} + +func chooseAliveUploadStorage(storages []FailoverFolder) (FailoverFolder, error) { + aliveFolders, err := FindAliveStorages(storages, true) + if err != nil { + return FailoverFolder{}, err + } + + return aliveFolders[0], nil +} + +func FindCachedStorage(storages []FailoverFolder) (FailoverFolder, bool, error) { + if storageCache.IsActual() { + ctx := storageCache.Copy() + if ctx.LastGoodStorage != "" { + for i := range storages { + if storages[i].Name == ctx.LastGoodStorage { + return storages[i], true, nil + } + } + } + } + + return FailoverFolder{}, false, nil +} diff --git a/internal/storagetools/cat_object_handler.go b/internal/storagetools/cat_object_handler.go index fe186fc16..51c7f828a 100644 --- a/internal/storagetools/cat_object_handler.go +++ b/internal/storagetools/cat_object_handler.go @@ -1,14 +1,17 @@ package storagetools import ( + "fmt" "os" - "github.com/wal-g/tracelog" "github.com/wal-g/wal-g/pkg/storages/storage" ) -func HandleCatObject(objectPath string, folder storage.Folder, decrypt, decompress bool) { +func HandleCatObject(objectPath string, folder storage.Folder, decrypt, decompress bool) error { dstFile := os.Stdout err := downloadObject(objectPath, folder, dstFile, decrypt, decompress) - tracelog.ErrorLogger.FatalfOnError("Failed to download the file: %v", err) + if err != nil { + return fmt.Errorf("download the file: %v", err) + } + return nil } diff --git a/internal/storagetools/check.go b/internal/storagetools/check.go index 239e6d624..d87f00b40 100644 --- a/internal/storagetools/check.go +++ b/internal/storagetools/check.go @@ -12,10 +12,10 @@ import ( "github.com/wal-g/tracelog" ) -func HandleCheckRead(folder storage.Folder, filenames []string) { +func HandleCheckRead(folder storage.Folder, filenames []string) error { _, _, err := folder.ListFolder() if err != nil { - tracelog.ErrorLogger.Fatalf("failed to list the storage: %v", err) + return fmt.Errorf("failed to list the storage: %v", err) } missing := make([]string, 0) for _, name := range filenames { @@ -25,9 +25,10 @@ func HandleCheckRead(folder storage.Folder, filenames []string) { } } if len(missing) > 0 { - tracelog.ErrorLogger.Fatalf("files are missing: %s", strings.Join(missing, ", ")) + return fmt.Errorf("files are missing: %s", strings.Join(missing, ", ")) } tracelog.InfoLogger.Println("Read check OK") + return nil } func randomName(length int) string { @@ -37,13 +38,13 @@ func randomName(length int) string { return fmt.Sprintf("%x", b)[:length] } -func HandleCheckWrite(folder storage.Folder) { +func HandleCheckWrite(folder storage.Folder) error { var filename string for { filename = randomName(32) ok, err := folder.Exists(filename) if err != nil { - tracelog.ErrorLogger.Fatalf("failed to read from the storage: %v", err) + return fmt.Errorf("failed to read from the storage: %v", err) } if !ok { break @@ -54,7 +55,8 @@ func HandleCheckWrite(folder storage.Folder) { tracelog.WarningLogger.Printf("failed to clean temp files, %s left in storage", filename) } if err != nil { - tracelog.ErrorLogger.Fatalf("failed to write to the storage: %v", err) + return fmt.Errorf("failed to write to the storage: %v", err) } tracelog.InfoLogger.Println("Write check OK") + return nil } diff --git a/internal/storagetools/delete_object_handler.go b/internal/storagetools/delete_object_handler.go index bbe25cf07..d79cead96 100644 --- a/internal/storagetools/delete_object_handler.go +++ b/internal/storagetools/delete_object_handler.go @@ -1,17 +1,24 @@ package storagetools import ( - "github.com/wal-g/tracelog" + "fmt" + "github.com/wal-g/wal-g/pkg/storages/storage" ) -func HandleDeleteObject(objectPath string, folder storage.Folder) { +func HandleDeleteObject(objectPath string, folder storage.Folder) error { // some storages may not produce an error on deleting the non-existing object exists, err := folder.Exists(objectPath) - tracelog.ErrorLogger.FatalfOnError("Failed to check object existence: %v", err) + if err != nil { + return fmt.Errorf("check object existence: %v", err) + } + if !exists { - tracelog.ErrorLogger.Fatalf("Object %s does not exist", objectPath) + return fmt.Errorf("object %s does not exist", objectPath) } err = folder.DeleteObjects([]string{objectPath}) - tracelog.ErrorLogger.FatalfOnError("Failed to delete the specified object: %v", err) + if err != nil { + return fmt.Errorf("delete the specified object: %v", err) + } + return nil } diff --git a/internal/storagetools/folder_list_handler.go b/internal/storagetools/folder_list_handler.go index 5613c3642..6f90de7c0 100644 --- a/internal/storagetools/folder_list_handler.go +++ b/internal/storagetools/folder_list_handler.go @@ -8,7 +8,6 @@ import ( "text/tabwriter" "time" - "github.com/wal-g/tracelog" "github.com/wal-g/wal-g/pkg/storages/storage" ) @@ -60,7 +59,7 @@ func (ld *ListDirectory) Type() ListElementType { return Directory } -func HandleFolderList(folder storage.Folder, recursive bool) { +func HandleFolderList(folder storage.Folder, recursive bool) error { var list []ListElement var folderObjects []storage.Object var err error @@ -74,14 +73,20 @@ func HandleFolderList(folder storage.Folder, recursive bool) { list = append(list, NewListDirectory(subFolders[i], folder)) } } + if err != nil { + return fmt.Errorf("list folder: %v", err) + } for i := range folderObjects { list = append(list, NewListObject(folderObjects[i])) } - tracelog.ErrorLogger.FatalfOnError("Failed to list the folder: %v", err) err = WriteObjectsList(list, os.Stdout) - tracelog.ErrorLogger.FatalfOnError("Failed to write the folder listing: %v", err) + if err != nil { + return fmt.Errorf("write folder listing: %v", err) + } + + return nil } func WriteObjectsList(objects []ListElement, output io.Writer) error { diff --git a/internal/storagetools/get_object_handler.go b/internal/storagetools/get_object_handler.go index 852c481ff..6901a7e52 100644 --- a/internal/storagetools/get_object_handler.go +++ b/internal/storagetools/get_object_handler.go @@ -2,6 +2,7 @@ package storagetools import ( "errors" + "fmt" "io" "os" "path" @@ -13,20 +14,28 @@ import ( "github.com/wal-g/wal-g/utility" ) -func HandleGetObject(objectPath, dstPath string, folder storage.Folder, decrypt, decompress bool) { +func HandleGetObject(objectPath, dstPath string, folder storage.Folder, decrypt, decompress bool) error { fileName := path.Base(objectPath) targetPath, err := getTargetFilePath(dstPath, fileName) - tracelog.ErrorLogger.FatalfOnError("Failed to determine the destination path: %v", err) + if err != nil { + return fmt.Errorf("determine the destination path: %v", err) + } dstFile, err := os.OpenFile(targetPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_EXCL, 0640) - tracelog.ErrorLogger.FatalfOnError("Failed to open the destination file: %v", err) + if err != nil { + return fmt.Errorf("open the destination file: %v", err) + } err = downloadObject(objectPath, folder, dstFile, decrypt, decompress) dstFile.Close() if err != nil { os.Remove(targetPath) - tracelog.ErrorLogger.Fatalf("Failed to download the file: %v", err) + if err != nil { + return fmt.Errorf("download the file: %v", err) + } } + + return nil } func getTargetFilePath(dstPath string, fileName string) (string, error) { diff --git a/internal/storagetools/put_object_handler.go b/internal/storagetools/put_object_handler.go index 44f001d2a..8c0ecccdd 100644 --- a/internal/storagetools/put_object_handler.go +++ b/internal/storagetools/put_object_handler.go @@ -1,6 +1,7 @@ package storagetools import ( + "fmt" "io" "os" "path/filepath" @@ -10,14 +11,20 @@ import ( "github.com/wal-g/wal-g/utility" - "github.com/wal-g/tracelog" "github.com/wal-g/wal-g/internal" ) -func HandlePutObject(localPath, dstPath string, uploader internal.Uploader, overwrite, encrypt, compress bool) { - checkOverwrite(dstPath, uploader, overwrite) +func HandlePutObject(localPath, dstPath string, uploader internal.Uploader, overwrite, encrypt, compress bool) error { + err := checkOverwrite(dstPath, uploader, overwrite) + if err != nil { + return fmt.Errorf("check file overwrite: %v", err) + } + + fileReadCloser, err := openLocalFile(localPath) + if err != nil { + return fmt.Errorf("open local file: %v", err) + } - fileReadCloser := openLocalFile(localPath) defer fileReadCloser.Close() storageFolderPath := utility.SanitizePath(filepath.Dir(dstPath)) @@ -26,29 +33,41 @@ func HandlePutObject(localPath, dstPath string, uploader internal.Uploader, over } fileName := utility.SanitizePath(filepath.Base(dstPath)) - err := uploadFile(fileName, fileReadCloser, uploader, encrypt, compress) - tracelog.ErrorLogger.FatalfOnError("Failed to upload: %v", err) + err = uploadFile(fileName, fileReadCloser, uploader, encrypt, compress) + if err != nil { + return fmt.Errorf("upload: %v", err) + } + return nil } -func checkOverwrite(dstPath string, uploader internal.Uploader, overwrite bool) { +func checkOverwrite(dstPath string, uploader internal.Uploader, overwrite bool) error { fullPath := dstPath + "." + uploader.Compression().FileExtension() exists, err := uploader.Folder().Exists(fullPath) - tracelog.ErrorLogger.FatalfOnError("Failed to check object existence: %v", err) + if err != nil { + return fmt.Errorf("check object existence: %v", err) + } if exists && !overwrite { - tracelog.ErrorLogger.Fatalf("Object %s already exists. To overwrite it, add the -f flag.", fullPath) + return fmt.Errorf("object %s already exists. To overwrite it, add the -f flag", fullPath) } + return nil } -func openLocalFile(localPath string) io.ReadCloser { +func openLocalFile(localPath string) (io.ReadCloser, error) { localFile, err := os.Open(localPath) - tracelog.ErrorLogger.FatalfOnError("Could not open the local file: %v", err) + if err != nil { + return nil, fmt.Errorf("open the local file: %v", err) + } + fileInfo, err := localFile.Stat() - tracelog.ErrorLogger.FatalfOnError("Could not Stat() the local file: %v", err) + if err != nil { + return nil, fmt.Errorf("stat() the local file: %v", err) + } + if fileInfo.IsDir() { - tracelog.ErrorLogger.Fatalf("Provided local path (%s) points to a directory, exiting", localPath) + return nil, fmt.Errorf("provided local path (%s) points to a directory, exiting", localPath) } - return localFile + return localFile, nil } func uploadFile(name string, content io.Reader, uploader internal.Uploader, encrypt, compress bool) error { diff --git a/internal/stream_push_helper_test.go b/internal/stream_push_helper_test.go index 6045aa01d..cbb404aca 100644 --- a/internal/stream_push_helper_test.go +++ b/internal/stream_push_helper_test.go @@ -97,7 +97,7 @@ func checkPushAndFetchBackup(t *testing.T, partitions, blockSize, maxFileSize, n compressor := compression.Compressors[compression.CompressingAlgorithms[0]] uploader := &SplitStreamUploader{ - Uploader: NewUploader(compressor, storageFolder), + Uploader: NewRegularUploader(compressor, storageFolder), partitions: partitions, blockSize: blockSize, maxFileSize: maxFileSize, diff --git a/internal/uploader.go b/internal/uploader.go index 9b3403744..4bcba5ef0 100644 --- a/internal/uploader.go +++ b/internal/uploader.go @@ -65,10 +65,10 @@ type UploadObject struct { Content io.Reader } -func NewUploader( +func NewRegularUploader( compressor compression.Compressor, uploadingLocation storage.Folder, -) Uploader { +) *RegularUploader { uploader := &RegularUploader{ UploadingFolder: uploadingLocation, Compressor: compressor, diff --git a/testtools/util.go b/testtools/util.go index fba480840..d44d512e5 100644 --- a/testtools/util.go +++ b/testtools/util.go @@ -47,14 +47,14 @@ func MakeDefaultUploader(uploaderAPI s3manageriface.UploaderAPI) *s3.Uploader { func NewMockUploader(apiMultiErr, apiErr bool) internal.Uploader { s3Uploader := MakeDefaultUploader(NewMockS3Uploader(apiMultiErr, apiErr, nil)) - return internal.NewUploader( + return internal.NewRegularUploader( &MockCompressor{}, s3.NewFolder(*s3Uploader, NewMockS3Client(false, true), map[string]string{}, "bucket/", "server/", false), ) } func NewStoringMockUploader(storage *memory.Storage) internal.Uploader { - return internal.NewUploader( + return internal.NewRegularUploader( &MockCompressor{}, memory.NewFolder("in_memory/", storage), ) @@ -62,7 +62,7 @@ func NewStoringMockUploader(storage *memory.Storage) internal.Uploader { func NewMockWalUploader(apiMultiErr, apiErr bool) *postgres.WalUploader { s3Uploader := MakeDefaultUploader(NewMockS3Uploader(apiMultiErr, apiErr, nil)) - upl := internal.NewUploader(&MockCompressor{}, + upl := internal.NewRegularUploader(&MockCompressor{}, s3.NewFolder(*s3Uploader, NewMockS3Client(false, true), map[string]string{}, "bucket/", "server/", false)) return postgres.NewWalUploader( upl, @@ -72,7 +72,7 @@ func NewMockWalUploader(apiMultiErr, apiErr bool) *postgres.WalUploader { func CreateMockStorageWalUploader() internal.Uploader { var folder = MakeDefaultInMemoryStorageFolder() - return internal.NewUploader(&MockCompressor{}, folder.GetSubFolder(utility.WalPath)) + return internal.NewRegularUploader(&MockCompressor{}, folder.GetSubFolder(utility.WalPath)) } func NewMockWalDirUploader(apiMultiErr, apiErr bool) *postgres.WalUploader {