Skip to content

Commit

Permalink
[Postgres] Add failover storages for wal-push and wal-fetch (wal-g#1466)
Browse files Browse the repository at this point in the history
  • Loading branch information
usernamedt authored May 11, 2023
1 parent 2ef3cde commit 35987d1
Show file tree
Hide file tree
Showing 39 changed files with 979 additions and 184 deletions.
1 change: 1 addition & 0 deletions .github/workflows/dockertests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ jobs:
'make TEST="pg_full_backup_copy_composer_test" pg_integration_test',
'make TEST="pg_full_backup_rating_composer_test" pg_integration_test',
'make TEST="pg_full_backup_database_composer_test" pg_integration_test',
'make TEST="pg_full_backup_failover_storage_test" pg_integration_test',
'make TEST="pg_delete_before_name_find_full_test" pg_integration_test',
'make TEST="pg_delete_retain_full_test" pg_integration_test',
'make TEST="pg_delete_before_time_find_full_test" pg_integration_test',
Expand Down
9 changes: 5 additions & 4 deletions cmd/common/st/cat_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package st
import (
"github.com/spf13/cobra"
"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/internal"
"github.com/wal-g/wal-g/internal/multistorage"
"github.com/wal-g/wal-g/internal/storagetools"
"github.com/wal-g/wal-g/pkg/storages/storage"
)

const (
Expand All @@ -22,10 +23,10 @@ var catObjectCmd = &cobra.Command{
Run: func(cmd *cobra.Command, args []string) {
objectPath := args[0]

folder, err := internal.ConfigureFolder()
err := multistorage.ExecuteOnStorage(targetStorage, func(folder storage.Folder) error {
return storagetools.HandleCatObject(objectPath, folder, decrypt, decompress)
})
tracelog.ErrorLogger.FatalOnError(err)

storagetools.HandleCatObject(objectPath, folder, decrypt, decompress)
},
}

Expand Down
13 changes: 8 additions & 5 deletions cmd/common/st/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package st
import (
"github.com/spf13/cobra"
"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/internal"
"github.com/wal-g/wal-g/internal/multistorage"
"github.com/wal-g/wal-g/internal/storagetools"
"github.com/wal-g/wal-g/pkg/storages/storage"
)

var checkCmd = &cobra.Command{
Expand All @@ -17,9 +18,10 @@ var checkReadCmd = &cobra.Command{
Short: "check read access to the storage",
Args: cobra.MinimumNArgs(0),
Run: func(cmd *cobra.Command, args []string) {
folder, err := internal.ConfigureFolder()
err := multistorage.ExecuteOnStorage(targetStorage, func(folder storage.Folder) error {
return storagetools.HandleCheckRead(folder, args)
})
tracelog.ErrorLogger.FatalOnError(err)
storagetools.HandleCheckRead(folder, args)
},
}

Expand All @@ -28,9 +30,10 @@ var checkWriteCmd = &cobra.Command{
Short: "check write access to the storage",
Args: cobra.NoArgs,
Run: func(cmd *cobra.Command, args []string) {
folder, err := internal.ConfigureFolder()
err := multistorage.ExecuteOnStorage(targetStorage, func(folder storage.Folder) error {
return storagetools.HandleCheckWrite(folder)
})
tracelog.ErrorLogger.FatalOnError(err)
storagetools.HandleCheckWrite(folder)
},
}

Expand Down
9 changes: 5 additions & 4 deletions cmd/common/st/delete_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package st
import (
"github.com/spf13/cobra"
"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/internal"
"github.com/wal-g/wal-g/internal/multistorage"
"github.com/wal-g/wal-g/internal/storagetools"
"github.com/wal-g/wal-g/pkg/storages/storage"
)

const deleteObjectShortDescription = "Delete the specified storage object"
Expand All @@ -15,10 +16,10 @@ var deleteObjectCmd = &cobra.Command{
Short: deleteObjectShortDescription,
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
folder, err := internal.ConfigureFolder()
err := multistorage.ExecuteOnStorage(targetStorage, func(folder storage.Folder) error {
return storagetools.HandleDeleteObject(args[0], folder)
})
tracelog.ErrorLogger.FatalOnError(err)

storagetools.HandleDeleteObject(args[0], folder)
},
}

Expand Down
16 changes: 8 additions & 8 deletions cmd/common/st/folder_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package st
import (
"github.com/spf13/cobra"
"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/internal"
"github.com/wal-g/wal-g/internal/multistorage"
"github.com/wal-g/wal-g/internal/storagetools"
"github.com/wal-g/wal-g/pkg/storages/storage"
)

const folderListShortDescription = "Prints objects in the provided storage folder"
Expand All @@ -17,14 +18,13 @@ var folderListCmd = &cobra.Command{
Short: folderListShortDescription,
Args: cobra.RangeArgs(0, 1),
Run: func(cmd *cobra.Command, args []string) {
folder, err := internal.ConfigureFolder()
err := multistorage.ExecuteOnStorage(targetStorage, func(folder storage.Folder) error {
if len(args) > 0 {
folder = folder.GetSubFolder(args[0])
}
return storagetools.HandleFolderList(folder, recursive)
})
tracelog.ErrorLogger.FatalOnError(err)

if len(args) > 0 {
folder = folder.GetSubFolder(args[0])
}

storagetools.HandleFolderList(folder, recursive)
},
}

Expand Down
13 changes: 9 additions & 4 deletions cmd/common/st/get_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package st
import (
"github.com/spf13/cobra"
"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/internal"
"github.com/wal-g/wal-g/internal/multistorage"
"github.com/wal-g/wal-g/internal/storagetools"
"github.com/wal-g/wal-g/pkg/storages/storage"
)

const (
Expand All @@ -23,10 +24,14 @@ var getObjectCmd = &cobra.Command{
objectPath := args[0]
dstPath := args[1]

folder, err := internal.ConfigureFolder()
tracelog.ErrorLogger.FatalOnError(err)
if targetStorage == "all" {
tracelog.ErrorLogger.Fatalf("'all' target is not supported for st get command")
}

storagetools.HandleGetObject(objectPath, dstPath, folder, !noDecrypt, !noDecompress)
err := multistorage.ExecuteOnStorage(targetStorage, func(folder storage.Folder) error {
return storagetools.HandleGetObject(objectPath, dstPath, folder, !noDecrypt, !noDecompress)
})
tracelog.ErrorLogger.FatalOnError(err)
},
}

Expand Down
7 changes: 6 additions & 1 deletion cmd/common/st/put_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"github.com/spf13/cobra"
"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/internal"
"github.com/wal-g/wal-g/internal/multistorage"
"github.com/wal-g/wal-g/internal/storagetools"
"github.com/wal-g/wal-g/pkg/storages/storage"
)

const (
Expand All @@ -28,7 +30,10 @@ var putObjectCmd = &cobra.Command{
localPath := args[0]
dstPath := args[1]

storagetools.HandlePutObject(localPath, dstPath, uploader, overwrite, !noEncrypt, !noCompress)
err = multistorage.ExecuteOnStorage(targetStorage, func(folder storage.Folder) error {
return storagetools.HandlePutObject(localPath, dstPath, uploader, overwrite, !noEncrypt, !noCompress)
})
tracelog.ErrorLogger.FatalOnError(err)
},
}

Expand Down
7 changes: 7 additions & 0 deletions cmd/common/st/st.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package st

import (
"github.com/spf13/cobra"
"github.com/wal-g/wal-g/internal/multistorage"
)

// Storage tools allows to interact with the configured storage, e.g.:
Expand All @@ -20,4 +21,10 @@ var (
Long: "Storage tools allows to interact with the configured storage. " +
"Be aware that this command can do potentially harmful operations and make sure that you know what you're doing.",
}
targetStorage string
)

func init() {
StorageToolsCmd.PersistentFlags().StringVarP(&targetStorage, "target", "t", multistorage.DefaultStorage,
"execute for specific failover storage (Postgres only)")
}
10 changes: 9 additions & 1 deletion cmd/pg/wal_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/internal"
"github.com/wal-g/wal-g/internal/databases/postgres"
"github.com/wal-g/wal-g/internal/multistorage"
)

const WalFetchShortDescription = "Fetches a WAL file from storage"
Expand All @@ -17,7 +18,14 @@ var walFetchCmd = &cobra.Command{
Run: func(cmd *cobra.Command, args []string) {
folder, err := internal.ConfigureFolder()
tracelog.ErrorLogger.FatalOnError(err)
postgres.HandleWALFetch(internal.NewFolderReader(folder), args[0], args[1], true)

failover, err := internal.InitFailoverStorages()
tracelog.ErrorLogger.FatalOnError(err)

folderReader, err := multistorage.NewStorageFolderReader(folder, failover)
tracelog.ErrorLogger.FatalOnError(err)

postgres.HandleWALFetch(folderReader, args[0], args[1], true)
},
}

Expand Down
10 changes: 9 additions & 1 deletion cmd/pg/wal_prefetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/internal"
"github.com/wal-g/wal-g/internal/databases/postgres"
"github.com/wal-g/wal-g/internal/multistorage"
)

const WalPrefetchShortDescription = `Used for prefetching process forking
Expand All @@ -19,7 +20,14 @@ var WalPrefetchCmd = &cobra.Command{
Run: func(cmd *cobra.Command, args []string) {
folder, err := internal.ConfigureFolder()
tracelog.ErrorLogger.FatalOnError(err)
postgres.HandleWALPrefetch(internal.NewFolderReader(folder), args[0], args[1])

failover, err := internal.InitFailoverStorages()
tracelog.ErrorLogger.FatalOnError(err)

folderReader, err := multistorage.NewStorageFolderReader(folder, failover)
tracelog.ErrorLogger.FatalOnError(err)

postgres.HandleWALPrefetch(folderReader, args[0], args[1])
},
}

Expand Down
21 changes: 14 additions & 7 deletions cmd/pg/wal_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/wal-g/wal-g/internal"
"github.com/wal-g/wal-g/internal/asm"
"github.com/wal-g/wal-g/internal/databases/postgres"
"github.com/wal-g/wal-g/internal/multistorage"
"github.com/wal-g/wal-g/utility"
)

Expand All @@ -20,27 +21,33 @@ var walPushCmd = &cobra.Command{
baseUploader, err := internal.ConfigureUploader()
tracelog.ErrorLogger.FatalOnError(err)

uploader, err := postgres.ConfigureWalUploader(baseUploader)
failover, err := internal.InitFailoverStorages()
tracelog.ErrorLogger.FatalOnError(err)

uploader, err := multistorage.NewUploader(baseUploader, failover)
tracelog.ErrorLogger.FatalOnError(err)

walUploader, err := postgres.ConfigureWalUploader(uploader)
tracelog.ErrorLogger.FatalOnError(err)

archiveStatusManager, err := internal.ConfigureArchiveStatusManager()
if err == nil {
uploader.ArchiveStatusManager = asm.NewDataFolderASM(archiveStatusManager)
walUploader.ArchiveStatusManager = asm.NewDataFolderASM(archiveStatusManager)
} else {
tracelog.ErrorLogger.PrintError(err)
uploader.ArchiveStatusManager = asm.NewNopASM()
walUploader.ArchiveStatusManager = asm.NewNopASM()
}

PGArchiveStatusManager, err := internal.ConfigurePGArchiveStatusManager()
if err == nil {
uploader.PGArchiveStatusManager = asm.NewDataFolderASM(PGArchiveStatusManager)
walUploader.PGArchiveStatusManager = asm.NewDataFolderASM(PGArchiveStatusManager)
} else {
tracelog.ErrorLogger.PrintError(err)
uploader.PGArchiveStatusManager = asm.NewNopASM()
walUploader.PGArchiveStatusManager = asm.NewNopASM()
}

uploader.ChangeDirectory(utility.WalPath)
err = postgres.HandleWALPush(uploader, args[0])
walUploader.ChangeDirectory(utility.WalPath)
err = postgres.HandleWALPush(walUploader, args[0])
tracelog.ErrorLogger.FatalOnError(err)
},
}
Expand Down
12 changes: 12 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ services:
&& mkdir -p /export/fullcopycomposerbucket
&& mkdir -p /export/fullwithoutfilesmetadatabucket
&& mkdir -p /export/fullscandeltabucket
&& mkdir -p /export/failoverstoragebucket
&& mkdir -p /export/partialbucket
&& mkdir -p /export/remotebucket
&& mkdir -p /export/remotewithoutfilesmetadatabucket
Expand Down Expand Up @@ -269,6 +270,17 @@ services:
links:
- s3

pg_full_backup_failover_storage_test:
build:
dockerfile: docker/pg_tests/Dockerfile_full_backup_failover_storages_test
context: .
image: wal-g/full_backup_failover_storage_test
container_name: wal-g_pg_full_backup_failover_storage_test
depends_on:
- s3
links:
- s3

pg_full_backup_streamed_test:
build:
dockerfile: docker/pg_tests/Dockerfile_full_backup_streamed_test
Expand Down
3 changes: 3 additions & 0 deletions docker/pg_tests/Dockerfile_full_backup_failover_storages_test
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
FROM wal-g/docker_prefix:latest

CMD su postgres -c "/tmp/tests/full_backup_failover_storages_test.sh"
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"WALE_S3_PREFIX": "s3://failoverstoragebucket",
"WALG_DELTA_MAX_STEPS": "6",
"WALG_PGP_KEY_PATH": "/tmp/PGP_KEY",
"WALG_LOG_LEVEL": "DEVEL"
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"WALE_S3_PREFIX": "s3://nonexistingbucket",
"WALG_DELTA_MAX_STEPS": "6",
"WALG_LOG_LEVEL": "DEVEL",
"WALG_PGP_KEY_PATH": "/tmp/PGP_KEY",
"WALG_FAILOVER_STORAGES": {
"not_working_failover": {
"AWS_SECRET_ACCESS_KEY": "useless_access_key",
"AWS_ACCESS_KEY_ID": "incorrect_access_key_id",
"WALE_S3_PREFIX": "s3://some-useless-s3-prefix"
},
"good_failover": {
"AWS_SECRET_ACCESS_KEY": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"AWS_ACCESS_KEY_ID": "AKIAIOSFODNN7EXAMPLE",
"WALE_S3_PREFIX": "s3://failoverstoragebucket"
}
},
"WALG_FAILOVER_STORAGES_CHECK_TIMEOUT": "5s"
Loading

0 comments on commit 35987d1

Please sign in to comment.