Skip to content

Commit

Permalink
Refactor storage abstractions (wal-g#1467)
Browse files Browse the repository at this point in the history
This is a preliminary PR for wal-g#1466. It refactors some storage abstractions:

- Finally switch to the Uploader interface instead of the plain struct
- Introduce interface for read-only storage access StorageReader, which is analog for the Uploader interface
  • Loading branch information
usernamedt authored May 3, 2023
1 parent c50db95 commit 5fad985
Show file tree
Hide file tree
Showing 60 changed files with 304 additions and 285 deletions.
4 changes: 2 additions & 2 deletions cmd/pg/backup_mark.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ var (
Long: BackupMarkLongDescription,
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
uploader, err := postgres.ConfigureWalUploader()
uploader, err := internal.ConfigureUploader()
tracelog.ErrorLogger.FatalOnError(err)
internal.HandleBackupMark(uploader.Uploader, args[0], !toImpermanent, postgres.NewGenericMetaInteractor())
internal.HandleBackupMark(uploader, args[0], !toImpermanent, postgres.NewGenericMetaInteractor())
},
}
toImpermanent = false
Expand Down
7 changes: 5 additions & 2 deletions cmd/pg/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ var daemonCmd = &cobra.Command{
Short: DaemonShortDescription, // TODO : improve description
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
uploader, err := postgres.ConfigureWalUploader()
baseUploader, err := internal.ConfigureUploader()
tracelog.ErrorLogger.FatalOnError(err)

uploader, err := postgres.ConfigureWalUploader(baseUploader)
tracelog.ErrorLogger.FatalOnError(err)

archiveStatusManager, err := internal.ConfigureArchiveStatusManager()
Expand All @@ -35,7 +38,7 @@ var daemonCmd = &cobra.Command{
tracelog.ErrorLogger.PrintError(err)
uploader.PGArchiveStatusManager = asm.NewNopASM()
}
uploader.UploadingFolder = uploader.UploadingFolder.GetSubFolder(utility.WalPath)
uploader.ChangeDirectory(utility.WalPath)
postgres.HandleDaemon(uploader, args[0])
},
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/pg/wal_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ var walFetchCmd = &cobra.Command{
Run: func(cmd *cobra.Command, args []string) {
folder, err := internal.ConfigureFolder()
tracelog.ErrorLogger.FatalOnError(err)
postgres.HandleWALFetch(folder, args[0], args[1], true)
postgres.HandleWALFetch(internal.NewFolderReader(folder), args[0], args[1], true)
},
}

Expand Down
5 changes: 3 additions & 2 deletions cmd/pg/wal_prefetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pg
import (
"github.com/spf13/cobra"
"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/internal"
"github.com/wal-g/wal-g/internal/databases/postgres"
)

Expand All @@ -16,9 +17,9 @@ var WalPrefetchCmd = &cobra.Command{
Args: cobra.ExactArgs(2),
Hidden: true,
Run: func(cmd *cobra.Command, args []string) {
uploader, err := postgres.ConfigureWalUploaderWithoutCompressMethod()
folder, err := internal.ConfigureFolder()
tracelog.ErrorLogger.FatalOnError(err)
postgres.HandleWALPrefetch(uploader, args[0], args[1])
postgres.HandleWALPrefetch(internal.NewFolderReader(folder), args[0], args[1])
},
}

Expand Down
7 changes: 5 additions & 2 deletions cmd/pg/wal_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ var walPushCmd = &cobra.Command{
Short: WalPushShortDescription, // TODO : improve description
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
uploader, err := postgres.ConfigureWalUploader()
baseUploader, err := internal.ConfigureUploader()
tracelog.ErrorLogger.FatalOnError(err)

uploader, err := postgres.ConfigureWalUploader(baseUploader)
tracelog.ErrorLogger.FatalOnError(err)

archiveStatusManager, err := internal.ConfigureArchiveStatusManager()
Expand All @@ -36,7 +39,7 @@ var walPushCmd = &cobra.Command{
uploader.PGArchiveStatusManager = asm.NewNopASM()
}

uploader.UploadingFolder = uploader.UploadingFolder.GetSubFolder(utility.WalPath)
uploader.ChangeDirectory(utility.WalPath)
err = postgres.HandleWALPush(uploader, args[0])
tracelog.ErrorLogger.FatalOnError(err)
},
Expand Down
5 changes: 4 additions & 1 deletion cmd/pg/wal_receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ var walReceiveCmd = &cobra.Command{
Short: walReceiveShortDescription,
Args: cobra.ExactArgs(0),
Run: func(cmd *cobra.Command, args []string) {
uploader, err := postgres.ConfigureWalUploader()
baseUploader, err := internal.ConfigureUploader()
tracelog.ErrorLogger.FatalOnError(err)

uploader, err := postgres.ConfigureWalUploader(baseUploader)
tracelog.ErrorLogger.FatalOnError(err)

archiveStatusManager, err := internal.ConfigureArchiveStatusManager()
Expand Down
4 changes: 2 additions & 2 deletions cmd/redis/backup_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var backupPushCmd = &cobra.Command{
tracelog.ErrorLogger.FatalOnError(err)

// Configure folder
uploader.UploadingFolder = uploader.UploadingFolder.GetSubFolder(utility.BaseBackupPath)
uploader.ChangeDirectory(utility.BaseBackupPath)

backupCmd, err := internal.GetCommandSettingContext(ctx, internal.NameStreamCreateCmd)
tracelog.ErrorLogger.FatalOnError(err)
Expand All @@ -50,7 +50,7 @@ var backupPushCmd = &cobra.Command{
backupCmd.Env = append(backupCmd.Env, fmt.Sprintf("REDISCLI_AUTH=%s", redisPassword))
}
backupCmd.Stderr = os.Stderr
metaConstructor := archive.NewBackupRedisMetaConstructor(ctx, uploader.UploadingFolder, permanent)
metaConstructor := archive.NewBackupRedisMetaConstructor(ctx, uploader.Folder(), permanent)

err = redis.HandleBackupPush(uploader, backupCmd, metaConstructor)
tracelog.ErrorLogger.FatalfOnError("Redis backup creation failed: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion internal/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func GetBackupByName(backupName, subfolder string, folder storage.Folder) (Backu
}

// TODO : unit tests
func UploadSentinel(uploader UploaderProvider, sentinelDto interface{}, backupName string) error {
func UploadSentinel(uploader Uploader, sentinelDto interface{}, backupName string) error {
sentinelName := SentinelNameFromBackup(backupName)
return UploadDto(uploader.Folder(), sentinelDto, sentinelName)
}
Expand Down
12 changes: 2 additions & 10 deletions internal/backup_mark_handler.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,6 @@
package internal

import (
"github.com/wal-g/wal-g/utility"
)

func HandleBackupMark(uploader *Uploader, backupName string, toPermanent bool, metaInteractor GenericMetaInteractor) {
folder := uploader.UploadingFolder
baseBackupFolder := uploader.UploadingFolder.GetSubFolder(utility.BaseBackupPath)
uploader.UploadingFolder = baseBackupFolder

markHandler := NewBackupMarkHandler(metaInteractor, folder)
func HandleBackupMark(uploader Uploader, backupName string, toPermanent bool, metaInteractor GenericMetaInteractor) {
markHandler := NewBackupMarkHandler(metaInteractor, uploader.Folder())
markHandler.MarkBackup(backupName, toPermanent)
}
22 changes: 16 additions & 6 deletions internal/configure.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ type UnknownCompressionMethodError struct {
error
}

func newUnknownCompressionMethodError() UnknownCompressionMethodError {
func newUnknownCompressionMethodError(method string) UnknownCompressionMethodError {
return UnknownCompressionMethodError{
errors.Errorf("Unknown compression method, supported methods are: %v",
compression.CompressingAlgorithms)}
errors.Errorf("Unknown compression method: '%s', supported methods are: %v",
method, compression.CompressingAlgorithms)}
}

func (err UnknownCompressionMethodError) Error() string {
Expand Down Expand Up @@ -202,7 +202,7 @@ func GetPgSlotName() (pgSlotName string) {
func ConfigureCompressor() (compression.Compressor, error) {
compressionMethod := viper.GetString(CompressionMethodSetting)
if _, ok := compression.Compressors[compressionMethod]; !ok {
return nil, newUnknownCompressionMethodError()
return nil, newUnknownCompressionMethodError(compressionMethod)
}
return compression.Compressors[compressionMethod], nil
}
Expand Down Expand Up @@ -238,7 +238,7 @@ func ConfigurePGArchiveStatusManager() (fsutil.DataFolder, error) {
// ConfigureUploader connects to storage and creates an uploader. It makes sure
// that a valid session has started; if invalid, returns AWS error
// and `<nil>` values.
func ConfigureUploader() (uploader *Uploader, err error) {
func ConfigureUploader() (uploader Uploader, err error) {
folder, err := ConfigureFolder()
if err != nil {
return nil, errors.Wrap(err, "failed to configure folder")
Expand All @@ -253,7 +253,17 @@ func ConfigureUploader() (uploader *Uploader, err error) {
return uploader, err
}

func ConfigureSplitUploader() (UploaderProvider, error) {
func ConfigureUploaderWithoutCompressor() (uploader Uploader, err error) {
folder, err := ConfigureFolder()
if err != nil {
return nil, errors.Wrap(err, "failed to configure folder")
}

uploader = NewUploader(nil, folder)
return uploader, err
}

func ConfigureSplitUploader() (Uploader, error) {
uploader, err := ConfigureUploader()
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion internal/databases/fdb/backup_push_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type streamSentinelDto struct {
StartLocalTime time.Time
}

func HandleBackupPush(uploader internal.UploaderProvider, backupCmd *exec.Cmd) {
func HandleBackupPush(uploader internal.Uploader, backupCmd *exec.Cmd) {
timeStart := utility.TimeNowCrossPlatformLocal()

stdout, stderr, err := utility.StartCommandWithStdoutStderr(backupCmd)
Expand Down
4 changes: 2 additions & 2 deletions internal/databases/greenplum/ao_storage_uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

type AoStorageUploader struct {
uploader *internal.Uploader
uploader internal.Uploader
baseAoFiles BackupAOFiles
meta *AOFilesMetadataDTO
metaMutex sync.Mutex
Expand All @@ -24,7 +24,7 @@ type AoStorageUploader struct {
isIncremental bool
}

func NewAoStorageUploader(uploader *internal.Uploader, baseAoFiles BackupAOFiles,
func NewAoStorageUploader(uploader internal.Uploader, baseAoFiles BackupAOFiles,
crypter crypto.Crypter, files internal.BundleFiles, isIncremental bool) *AoStorageUploader {
// Separate uploader for AO/AOCS relfiles with disabled file size tracking since
// WAL-G does not count them
Expand Down
10 changes: 5 additions & 5 deletions internal/databases/greenplum/backup_push_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type SegmentFwdArg struct {

// BackupWorkers holds the external objects that the handler uses to get the backup data / write the backup data
type BackupWorkers struct {
Uploader *internal.Uploader
Uploader internal.Uploader
Conn *pgx.Conn
}

Expand Down Expand Up @@ -261,7 +261,7 @@ func (bh *BackupHandler) uploadRestorePointMetadata(restoreLSNs map[int]string)
tracelog.InfoLogger.Printf("Uploading restore point metadata file %s", metaFileName)
tracelog.InfoLogger.Println(meta.String())

if err := internal.UploadDto(bh.workers.Uploader.UploadingFolder, meta, metaFileName); err != nil {
if err := internal.UploadDto(bh.workers.Uploader.Folder(), meta, metaFileName); err != nil {
return fmt.Errorf("upload metadata file for restore point %s: %w", meta.Name, err)
}
return nil
Expand Down Expand Up @@ -426,7 +426,7 @@ func (bh *BackupHandler) uploadSentinel(sentinelDto BackupSentinelDto) (err erro
tracelog.InfoLogger.Println(sentinelDto.String())

sentinelUploader := bh.workers.Uploader
sentinelUploader.UploadingFolder = sentinelUploader.UploadingFolder.GetSubFolder(utility.BaseBackupPath)
sentinelUploader.ChangeDirectory(utility.BaseBackupPath)
return internal.UploadSentinel(sentinelUploader, sentinelDto, bh.currBackupInfo.backupName)
}

Expand Down Expand Up @@ -561,7 +561,7 @@ func (bh *BackupHandler) fetchSegmentBackupsMetadata() (map[string]PgSegmentSent

func (bh *BackupHandler) fetchSingleMetadata(backupID string, segCfg *cluster.SegConfig) (*PgSegmentSentinelDto, error) {
// Actually, this is not a real completed backup. It is only used to fetch the segment metadata
currentBackup := NewBackup(bh.workers.Uploader.UploadingFolder, bh.currBackupInfo.backupName)
currentBackup := NewBackup(bh.workers.Uploader.Folder(), bh.currBackupInfo.backupName)

pgBackup, err := currentBackup.GetSegmentBackup(backupID, segCfg.ContentID)
if err != nil {
Expand Down Expand Up @@ -658,7 +658,7 @@ func (bh *BackupHandler) configureDeltaBackup() (err error) {
return nil
}

folder := bh.workers.Uploader.UploadingFolder
folder := bh.workers.Uploader.Folder()
previousBackupName, err := bh.arguments.deltaBaseSelector.Select(folder)
if err != nil {
if _, ok := err.(internal.NoBackupsFoundError); ok {
Expand Down
8 changes: 4 additions & 4 deletions internal/databases/greenplum/restore_point.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type RestorePointCreator struct {
systemIdentifier *uint64
gpVersion semver.Version

Uploader *internal.Uploader
Uploader internal.Uploader
Conn *pgx.Conn

logsDir string
Expand Down Expand Up @@ -115,7 +115,7 @@ func NewRestorePointCreator(pointName string) (rpc *RestorePointCreator, err err
gpVersion: version,
logsDir: viper.GetString(internal.GPLogsDirectory),
}
rpc.Uploader.UploadingFolder = rpc.Uploader.UploadingFolder.GetSubFolder(utility.BaseBackupPath)
rpc.Uploader.ChangeDirectory(utility.BaseBackupPath)

return rpc, nil
}
Expand Down Expand Up @@ -153,7 +153,7 @@ func createRestorePoint(conn *pgx.Conn, restorePointName string) (restoreLSNs ma
}

func (rpc *RestorePointCreator) checkExists() error {
exists, err := rpc.Uploader.UploadingFolder.Exists(RestorePointMetadataFileName(rpc.pointName))
exists, err := rpc.Uploader.Folder().Exists(RestorePointMetadataFileName(rpc.pointName))
if err != nil {
return fmt.Errorf("failed to check restore point existence: %v", err)
}
Expand Down Expand Up @@ -183,7 +183,7 @@ func (rpc *RestorePointCreator) uploadMetadata(restoreLSNs map[int]string) (err
tracelog.InfoLogger.Printf("Uploading restore point metadata file %s", metaFileName)
tracelog.InfoLogger.Println(meta.String())

return internal.UploadDto(rpc.Uploader.UploadingFolder, meta, metaFileName)
return internal.UploadDto(rpc.Uploader.Folder(), meta, metaFileName)
}

type RestorePointTime struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func NewSegBackupHandler(arguments postgres.BackupArguments) (*postgres.BackupHa
return err
}

maker, err := NewGpTarBallComposerMaker(relStorageMap, bh.Workers.Uploader.Uploader, handler.CurBackupInfo.Name)
maker, err := NewGpTarBallComposerMaker(relStorageMap, bh.Workers.Uploader, handler.CurBackupInfo.Name)
if err != nil {
return err
}
Expand Down
16 changes: 8 additions & 8 deletions internal/databases/greenplum/tar_ball_composer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ type GpTarBallComposerMaker struct {
relStorageMap AoRelFileStorageMap
bundleFiles internal.BundleFiles
TarFileSets internal.TarFileSets
uploader *internal.Uploader
uploader internal.Uploader
backupName string
}

func NewGpTarBallComposerMaker(relStorageMap AoRelFileStorageMap, uploader *internal.Uploader, backupName string,
func NewGpTarBallComposerMaker(relStorageMap AoRelFileStorageMap, uploader internal.Uploader, backupName string,
) (*GpTarBallComposerMaker, error) {
return &GpTarBallComposerMaker{
relStorageMap: relStorageMap,
Expand Down Expand Up @@ -69,9 +69,9 @@ func (maker *GpTarBallComposerMaker) loadBaseFiles(incrementFromName string) (Ba
var base SegBackup
// In case of delta backup, use the provided backup name as the base. Otherwise, use the latest backup.
if incrementFromName != "" {
base = NewSegBackup(maker.uploader.UploadingFolder, incrementFromName)
base = NewSegBackup(maker.uploader.Folder(), incrementFromName)
} else {
backupName, err := internal.GetLatestBackupName(maker.uploader.UploadingFolder)
backupName, err := internal.GetLatestBackupName(maker.uploader.Folder())
if err != nil {
if _, ok := err.(internal.NoBackupsFoundError); ok {
tracelog.InfoLogger.Println("Couldn't find previous backup, leaving the base files empty.")
Expand All @@ -80,7 +80,7 @@ func (maker *GpTarBallComposerMaker) loadBaseFiles(incrementFromName string) (Ba

return nil, err
}
base = NewSegBackup(maker.uploader.UploadingFolder, backupName)
base = NewSegBackup(maker.uploader.Folder(), backupName)
}

baseFilesMetadata, err := base.LoadAoFilesMetadata()
Expand All @@ -106,7 +106,7 @@ type GpTarBallComposer struct {
addFileQueue chan *internal.ComposeFileInfo
addFileWaitGroup sync.WaitGroup

uploader *internal.Uploader
uploader internal.Uploader

files internal.BundleFiles
tarFileSets internal.TarFileSets
Expand All @@ -123,7 +123,7 @@ type GpTarBallComposer struct {
func NewGpTarBallComposer(
tarBallQueue *internal.TarBallQueue, crypter crypto.Crypter, relStorageMap AoRelFileStorageMap,
bundleFiles internal.BundleFiles, packer *postgres.TarBallFilePackerImpl, aoStorageUploader *AoStorageUploader,
tarFileSets internal.TarFileSets, uploader *internal.Uploader, backupName string,
tarFileSets internal.TarFileSets, uploader internal.Uploader, backupName string,
) (*GpTarBallComposer, error) {
errorGroup, ctx := errgroup.WithContext(context.Background())

Expand Down Expand Up @@ -194,7 +194,7 @@ func (c *GpTarBallComposer) FinishComposing() (internal.TarFileSets, error) {

c.addFileWaitGroup.Wait()

err = internal.UploadDto(c.uploader.UploadingFolder, c.aoStorageUploader.GetFiles(), getAOFilesMetadataPath(c.backupName))
err = internal.UploadDto(c.uploader.Folder(), c.aoStorageUploader.GetFiles(), getAOFilesMetadataPath(c.backupName))
if err != nil {
return nil, fmt.Errorf("failed to upload AO files metadata: %v", err)
}
Expand Down
Loading

0 comments on commit 5fad985

Please sign in to comment.