diff --git a/docs/Greenplum.md b/docs/Greenplum.md index cbb7b314a..fe9e457e0 100644 --- a/docs/Greenplum.md +++ b/docs/Greenplum.md @@ -62,6 +62,28 @@ After the successful configuration, use the `backup-push` command from the coord wal-g backup-push --config=/path/to/config.yaml ``` +#### Delta backups (work in progress) + +* `WALG_DELTA_MAX_STEPS` + +Delta-backup is the difference between previously taken backup and present state. `WALG_DELTA_MAX_STEPS` determines how many delta backups can be between full backups. Defaults to 0. +Restoration process will automatically fetch all necessary deltas and base backup and compose valid restored backup (you still need WALs after start of last backup to restore consistent cluster). + +Delta computation is based on ModTime of file system and LSN number of pages in datafiles for heap relations and on ModCount + EOF combination for AO/AOCS relations. + +##### Create delta from specific backup +When creating delta backup (`WALG_DELTA_MAX_STEPS` > 0), WAL-G uses the latest backup as the base by default. This behaviour can be changed via following flags: + +* `--delta-from-name` flag or `WALG_DELTA_FROM_NAME` environment variable to choose the backup with specified name as the base for the delta backup + +* `--delta-from-user-data` flag or `WALG_DELTA_FROM_USER_DATA` environment variable to choose the backup with specified user data as the base for the delta backup + +Examples: +```bash +wal-g backup-push --delta-from-name backup_name --config=/path/to/config.yaml +wal-g backup-push --delta-from-user-data "{ \"x\": [3], \"y\": 4 }" --config=/path/to/config.yaml +``` + ### ``backup-fetch`` When fetching base backups, the user should pass in the cluster restore configuration and the name of the backup. diff --git a/internal/databases/greenplum/ao_increment.go b/internal/databases/greenplum/ao_increment.go index 9fc85b1f7..d13130bd4 100644 --- a/internal/databases/greenplum/ao_increment.go +++ b/internal/databases/greenplum/ao_increment.go @@ -133,7 +133,10 @@ func ApplyFileIncrement(fileName string, increment io.Reader, fsync bool) error return nil } -func newIncrementalPageReader(file io.ReadSeekCloser, eof, offset int64) (io.ReadCloser, error) { +func NewIncrementalPageReader(file io.ReadSeekCloser, eof, offset int64) (io.ReadCloser, error) { + if eof <= offset { + return nil, fmt.Errorf("file eof %d is less or equal than offset %d", eof, offset) + } var headerBuffer bytes.Buffer headerBuffer.Write(IncrementFileHeader) headerBuffer.Write(utility.ToBytes(uint64(eof))) diff --git a/internal/databases/greenplum/ao_increment_test.go b/internal/databases/greenplum/ao_increment_test.go new file mode 100644 index 000000000..141c9b081 --- /dev/null +++ b/internal/databases/greenplum/ao_increment_test.go @@ -0,0 +1,70 @@ +package greenplum_test + +import ( + "bytes" + "fmt" + "github.com/stretchr/testify/assert" + "github.com/wal-g/wal-g/internal/databases/greenplum" + "github.com/wal-g/wal-g/internal/walparser/parsingutil" + "io" + "os" + "testing" +) + +const aoSegmentFileName = "../../../test/testdata/gp_ao_file.bin" +const aoSegmentFileSizeBytes = 192 + +func TestReadIncrement(t *testing.T) { + gpReadIncrement(10, 100, t) +} + +func TestReadIncrementFull(t *testing.T) { + gpReadIncrement(0, aoSegmentFileSizeBytes, t) +} + +func TestFailOnIncorrectOffset(t *testing.T) { + file, err := os.Open(aoSegmentFileName) + if err != nil { + fmt.Print(err.Error()) + } + + _, err = greenplum.NewIncrementalPageReader(file, aoSegmentFileSizeBytes, aoSegmentFileSizeBytes) + assert.Error(t, err) + + _, err = greenplum.NewIncrementalPageReader(file, 0, aoSegmentFileSizeBytes) + assert.Error(t, err) +} + +func gpReadIncrement(offset, eof int64, t *testing.T) { + file, err := os.Open(aoSegmentFileName) + if err != nil { + fmt.Print(err.Error()) + } + + reader, err := greenplum.NewIncrementalPageReader(file, eof, offset) + assert.NoError(t, err) + + increment, err := io.ReadAll(reader) + assert.NoError(t, err) + + incrementBuf := bytes.NewBuffer(increment) + err = greenplum.ReadIncrementFileHeader(incrementBuf) + assert.NoError(t, err) + + var parsedEof uint64 + var parsedOffset uint64 + err = parsingutil.ParseMultipleFieldsFromReader([]parsingutil.FieldToParse{ + {Field: &parsedEof, Name: "eof"}, + {Field: &parsedOffset, Name: "offset"}, + }, incrementBuf) + + assert.Equal(t, parsedOffset, uint64(offset)) + assert.Equal(t, parsedEof, uint64(eof)) + + _, _ = file.Seek(offset, io.SeekStart) + + fileFragment := new(bytes.Buffer) + _, _ = io.CopyN(fileFragment, file, eof-offset) + + assert.True(t, bytes.Equal(fileFragment.Bytes(), incrementBuf.Bytes())) +} diff --git a/internal/databases/greenplum/ao_storage_manager.go b/internal/databases/greenplum/ao_storage_uploader.go similarity index 98% rename from internal/databases/greenplum/ao_storage_manager.go rename to internal/databases/greenplum/ao_storage_uploader.go index 06a68860d..5cacc258d 100644 --- a/internal/databases/greenplum/ao_storage_manager.go +++ b/internal/databases/greenplum/ao_storage_uploader.go @@ -104,7 +104,7 @@ func (u *AoStorageUploader) GetFiles() *AOFilesMetadataDTO { func (u *AoStorageUploader) skipAoUpload(cfi *internal.ComposeFileInfo, aoMeta AoRelFileMetadata, storageKey string) error { u.addAoFileMetadata(cfi, storageKey, aoMeta, true, false) - u.bundleFiles.AddFile(cfi.Header, cfi.FileInfo, false) + u.bundleFiles.AddSkippedFile(cfi.Header, cfi.FileInfo) tracelog.DebugLogger.Printf("Skipping %s AO relfile (already exists in storage as %s)", cfi.Path, storageKey) return nil } @@ -147,7 +147,7 @@ func (u *AoStorageUploader) incrementalAoUpload( return err } - incrementalReader, err := newIncrementalPageReader(file, aoMeta.eof, baseFileEOF) + incrementalReader, err := NewIncrementalPageReader(file, aoMeta.eof, baseFileEOF) if err != nil { return err } diff --git a/internal/databases/greenplum/ao_storage_uploader_test.go b/internal/databases/greenplum/ao_storage_uploader_test.go new file mode 100644 index 000000000..e8ff4ff68 --- /dev/null +++ b/internal/databases/greenplum/ao_storage_uploader_test.go @@ -0,0 +1,403 @@ +package greenplum_test + +import ( + "archive/tar" + "fmt" + "github.com/stretchr/testify/assert" + "github.com/wal-g/wal-g/internal" + "github.com/wal-g/wal-g/internal/crypto/openpgp" + "github.com/wal-g/wal-g/internal/databases/greenplum" + "github.com/wal-g/wal-g/internal/walparser" + "github.com/wal-g/wal-g/pkg/storages/memory" + "github.com/wal-g/wal-g/testtools" + "github.com/wal-g/wal-g/utility" + "io" + "os" + "path/filepath" + "testing" + "time" +) + +type TestFileInfo struct { + internal.ComposeFileInfo + greenplum.AoRelFileMetadata + walparser.BlockLocation +} + +type ExpectedResult struct { + StoragePath string + IsSkipped bool + IsIncremented bool + StorageType greenplum.RelStorageType + EOF int64 + ModCount int64 +} + +const PrivateKeyFilePath = "../../../test/testdata/waleGpgKey" + +func TestRegularAoUpload(t *testing.T) { + baseFiles := make(greenplum.BackupAOFiles) + bundleFiles := &internal.RegularBundleFiles{} + testFiles := map[string]TestFileInfo{ + "1663.1": { + AoRelFileMetadata: greenplum.NewAoRelFileMetadata("md5summock", greenplum.ColumnOriented, 100, 3), + BlockLocation: walparser.BlockLocation{ + RelationFileNode: walparser.RelFileNode{ + SpcNode: 1009, + DBNode: 13, + RelNode: 1663, + }, + BlockNo: 1, + }, + }, + "1337.120": { + AoRelFileMetadata: greenplum.NewAoRelFileMetadata("md5summock", greenplum.AppendOptimized, 60, 4), + BlockLocation: walparser.BlockLocation{ + RelationFileNode: walparser.RelFileNode{ + SpcNode: 0, + DBNode: 13, + RelNode: 1337, + }, + BlockNo: 120, + }, + }, + "1337.60": { + AoRelFileMetadata: greenplum.NewAoRelFileMetadata("md5summock", greenplum.AppendOptimized, 77, 5), + BlockLocation: walparser.BlockLocation{ + RelationFileNode: walparser.RelFileNode{ + SpcNode: 0, + DBNode: 13, + RelNode: 1337, + }, + BlockNo: 60, + }, + }, + } + expectedResults := map[string]ExpectedResult{ + "1337.60": { + StoragePath: "0_13_md5summock_1337_60_5_aoseg", + IsSkipped: false, + IsIncremented: false, + StorageType: greenplum.AppendOptimized, + EOF: 77, + ModCount: 5, + }, + "1663.1": { + StoragePath: "1009_13_md5summock_1663_1_3_aoseg", + IsSkipped: false, + IsIncremented: false, + StorageType: greenplum.ColumnOriented, + EOF: 100, + ModCount: 3, + }, + "1337.120": { + StoragePath: "0_13_md5summock_1337_120_4_aoseg", + IsSkipped: false, + IsIncremented: false, + StorageType: greenplum.AppendOptimized, + EOF: 60, + ModCount: 4, + }, + } + runSingleTest(t, baseFiles, bundleFiles, testFiles, expectedResults) +} + +func TestIncrementalAoUpload(t *testing.T) { + baseFiles := greenplum.BackupAOFiles{ + "1337.120": { + StoragePath: "0_13_md5summock_1337_120_4_aoseg", + IsSkipped: false, + IsIncremented: false, + MTime: time.Now(), + StorageType: greenplum.AppendOptimized, + EOF: 60, + ModCount: 4, + Compressor: "", + FileMode: 420, + }, + } + bundleFiles := &internal.RegularBundleFiles{} + testFiles := map[string]TestFileInfo{ + "1663.1": { + AoRelFileMetadata: greenplum.NewAoRelFileMetadata("md5summock", greenplum.ColumnOriented, 100, 3), + BlockLocation: walparser.BlockLocation{ + RelationFileNode: walparser.RelFileNode{ + SpcNode: 0, + DBNode: 13, + RelNode: 1663, + }, + BlockNo: 1, + }, + }, + "1337.120": { + AoRelFileMetadata: greenplum.NewAoRelFileMetadata("md5summock", greenplum.AppendOptimized, 70, 5), + BlockLocation: walparser.BlockLocation{ + RelationFileNode: walparser.RelFileNode{ + SpcNode: 0, + DBNode: 13, + RelNode: 1337, + }, + BlockNo: 120, + }, + }, + "1337.60": { + AoRelFileMetadata: greenplum.NewAoRelFileMetadata("md5summock", greenplum.AppendOptimized, 77, 5), + BlockLocation: walparser.BlockLocation{ + RelationFileNode: walparser.RelFileNode{ + SpcNode: 0, + DBNode: 13, + RelNode: 1337, + }, + BlockNo: 60, + }, + }, + } + expectedResults := map[string]ExpectedResult{ + "1337.60": { + StoragePath: "0_13_md5summock_1337_60_5_aoseg", + IsSkipped: false, + IsIncremented: false, + StorageType: greenplum.AppendOptimized, + EOF: 77, + ModCount: 5, + }, + "1663.1": { + StoragePath: "0_13_md5summock_1663_1_3_aoseg", + IsSkipped: false, + IsIncremented: false, + StorageType: greenplum.ColumnOriented, + EOF: 100, + ModCount: 3, + }, + "1337.120": { + StoragePath: "0_13_md5summock_1337_120_4_D_5_aoseg", + IsSkipped: false, + IsIncremented: true, + StorageType: greenplum.AppendOptimized, + EOF: 70, + ModCount: 5, + }, + } + runSingleTest(t, baseFiles, bundleFiles, testFiles, expectedResults) +} + +func TestIncrementalAoUpload_EqualEof_DifferentModCount(t *testing.T) { + baseFiles := greenplum.BackupAOFiles{ + "1663.1": { + StoragePath: "1009_13_md5summock_1663_1_4_aoseg", + IsSkipped: false, + IsIncremented: false, + MTime: time.Now(), + StorageType: greenplum.ColumnOriented, + EOF: 100, + ModCount: 4, + Compressor: "", + FileMode: 420, + }, + } + bundleFiles := &internal.RegularBundleFiles{} + testFiles := map[string]TestFileInfo{ + "1663.1": { + AoRelFileMetadata: greenplum.NewAoRelFileMetadata("md5summock", greenplum.ColumnOriented, 100, 5), + BlockLocation: walparser.BlockLocation{ + RelationFileNode: walparser.RelFileNode{ + SpcNode: 1009, + DBNode: 13, + RelNode: 1663, + }, + BlockNo: 1, + }, + }, + } + expectedResults := map[string]ExpectedResult{ + "1663.1": { + StoragePath: "1009_13_md5summock_1663_1_5_aoseg", + IsSkipped: false, + IsIncremented: false, + StorageType: greenplum.ColumnOriented, + EOF: 100, + ModCount: 5, + }, + } + runSingleTest(t, baseFiles, bundleFiles, testFiles, expectedResults) +} + +func TestIncrementalAoUpload_DifferentEof_EqualModCount(t *testing.T) { + baseFiles := greenplum.BackupAOFiles{ + "1663.1": { + StoragePath: "1009_13_md5summock_1663_1_4_aoseg", + IsSkipped: false, + IsIncremented: false, + MTime: time.Now(), + StorageType: greenplum.ColumnOriented, + EOF: 70, + ModCount: 4, + Compressor: "", + FileMode: 420, + }, + } + bundleFiles := &internal.RegularBundleFiles{} + testFiles := map[string]TestFileInfo{ + "1663.1": { + AoRelFileMetadata: greenplum.NewAoRelFileMetadata("md5summock", greenplum.ColumnOriented, 100, 4), + BlockLocation: walparser.BlockLocation{ + RelationFileNode: walparser.RelFileNode{ + SpcNode: 1009, + DBNode: 13, + RelNode: 1663, + }, + BlockNo: 1, + }, + }, + } + expectedResults := map[string]ExpectedResult{ + "1663.1": { + StoragePath: "1009_13_md5summock_1663_1_4_aoseg", + IsSkipped: false, + IsIncremented: false, + StorageType: greenplum.ColumnOriented, + EOF: 100, + ModCount: 4, + }, + } + runSingleTest(t, baseFiles, bundleFiles, testFiles, expectedResults) +} + +func TestAoUpload_SkippedFile(t *testing.T) { + baseFiles := greenplum.BackupAOFiles{ + "1663.1": { + StoragePath: "1009_13_md5summock_1663_1_4_aoseg", + IsSkipped: false, + IsIncremented: false, + MTime: time.Now(), + StorageType: greenplum.ColumnOriented, + EOF: 70, + ModCount: 4, + Compressor: "", + FileMode: 420, + }, + } + bundleFiles := &internal.RegularBundleFiles{} + testFiles := map[string]TestFileInfo{ + "1663.1": { + AoRelFileMetadata: greenplum.NewAoRelFileMetadata("md5summock", greenplum.ColumnOriented, 70, 4), + BlockLocation: walparser.BlockLocation{ + RelationFileNode: walparser.RelFileNode{ + SpcNode: 1009, + DBNode: 13, + RelNode: 1663, + }, + BlockNo: 1, + }, + }, + } + expectedResults := map[string]ExpectedResult{ + "1663.1": { + StoragePath: "1009_13_md5summock_1663_1_4_aoseg", + IsSkipped: true, + IsIncremented: false, + StorageType: greenplum.ColumnOriented, + EOF: 70, + ModCount: 4, + }, + } + runSingleTest(t, baseFiles, bundleFiles, testFiles, expectedResults) +} + +func runSingleTest(t *testing.T, baseFiles greenplum.BackupAOFiles, + bundleFiles *internal.RegularBundleFiles, testFiles map[string]TestFileInfo, expectedResults map[string]ExpectedResult) { + uploader := newAoStorageUploader(baseFiles, bundleFiles, true) + testDir, testFiles := generateData("data", testFiles, t) + defer os.RemoveAll(testDir) + + for _, testFile := range testFiles { + cfi := testFile.ComposeFileInfo + aoMeta := testFile.AoRelFileMetadata + location := testFile.BlockLocation + err := uploader.AddFile(&cfi, aoMeta, &location) + assert.NoError(t, err) + } + + filesMetaDto := uploader.GetFiles() + assert.Equal(t, len(expectedResults), len(filesMetaDto.Files)) + + bundleFilesMap := bundleFiles.GetUnderlyingMap() + + for name, resFile := range filesMetaDto.Files { + assert.Contains(t, expectedResults, name) + expFile := expectedResults[name] + assert.Equal(t, expFile.StoragePath, resFile.StoragePath) + assert.Equal(t, expFile.IsSkipped, resFile.IsSkipped) + assert.Equal(t, expFile.IsIncremented, resFile.IsIncremented) + assert.Equal(t, expFile.StorageType, resFile.StorageType) + assert.Equal(t, expFile.EOF, resFile.EOF) + assert.Equal(t, expFile.ModCount, resFile.ModCount) + + fileDescRaw, ok := bundleFilesMap.Load(name) + assert.True(t, ok) + fileDesc := fileDescRaw.(internal.BackupFileDescription) + assert.Equal(t, expFile.IsSkipped, fileDesc.IsSkipped) + assert.Equal(t, expFile.IsIncremented, fileDesc.IsIncremented) + } +} + +func newAoStorageUploader( + baseFiles greenplum.BackupAOFiles, bundleFiles internal.BundleFiles, isIncremental bool, +) *greenplum.AoStorageUploader { + storage := memory.NewStorage() + mockUploader := testtools.NewStoringMockUploader(storage) + crypter := openpgp.CrypterFromKeyPath(PrivateKeyFilePath, func() (string, bool) { + return "", false + }) + aoUploader := greenplum.NewAoStorageUploader(mockUploader, baseFiles, crypter, bundleFiles, isIncremental) + return aoUploader +} + +func generateData(dirName string, testFiles map[string]TestFileInfo, t *testing.T) (string, map[string]TestFileInfo) { + cwd, err := filepath.Abs("./") + if err != nil { + t.Log(err) + } + + // Create temp directory. + dir, err := os.MkdirTemp(cwd, dirName) + if err != nil { + t.Log(err) + } + fmt.Println(dir) + + sb := testtools.NewStrideByteReader(10) + + // Generates 100 byte files + for name, tfi := range testFiles { + lr := &io.LimitedReader{ + R: sb, + N: int64(100), + } + f, err := os.Create(filepath.Join(dir, name)) + if err != nil { + t.Log(err) + } + io.Copy(f, lr) + + fInfo, err := f.Stat() + if err != nil { + t.Log(err) + } + + header, err := tar.FileInfoHeader(fInfo, f.Name()) + if err != nil { + t.Log(err) + } + + header.Name = name + + cfi := internal.NewComposeFileInfo(f.Name(), fInfo, false, false, header) + tfi.ComposeFileInfo = *cfi + testFiles[name] = tfi + + defer utility.LoggedClose(f, "") + } + + return dir, testFiles +} diff --git a/internal/databases/greenplum/relfile_storage_map.go b/internal/databases/greenplum/relfile_storage_map.go index eb1d47d0e..dd7beeaff 100644 --- a/internal/databases/greenplum/relfile_storage_map.go +++ b/internal/databases/greenplum/relfile_storage_map.go @@ -15,6 +15,15 @@ const ( ColumnOriented RelStorageType = 'c' ) +func NewAoRelFileMetadata(relNameMd5 string, storageType RelStorageType, eof, modCount int64) AoRelFileMetadata { + return AoRelFileMetadata{ + relNameMd5: relNameMd5, + storageType: storageType, + eof: eof, + modCount: modCount, + } +} + type AoRelFileMetadata struct { relNameMd5 string storageType RelStorageType diff --git a/internal/databases/postgres/delta_file_manager_test.go b/internal/databases/postgres/delta_file_manager_test.go index 58e4270d3..3659ba8bb 100644 --- a/internal/databases/postgres/delta_file_manager_test.go +++ b/internal/databases/postgres/delta_file_manager_test.go @@ -205,7 +205,7 @@ func TestFlushDeltaFiles_CompleteFile(t *testing.T) { assert.NoError(t, err) manager.DeltaFileWriters.Store(DeltaFilename, postgres.NewDeltaFileChanWriter(deltaFile)) storage := memory.NewStorage() - manager.FlushDeltaFiles(testtools.NewStoringMockUploader(storage, nil), map[string]bool{ + manager.FlushDeltaFiles(testtools.NewStoringMockUploader(storage), map[string]bool{ postgres.ToPartFilename(DeltaFilename): true, }) diff --git a/test/testdata/gp_ao_file.bin b/test/testdata/gp_ao_file.bin new file mode 100644 index 000000000..0678df235 Binary files /dev/null and b/test/testdata/gp_ao_file.bin differ diff --git a/testtools/util.go b/testtools/util.go index 57789ed89..9762cb265 100644 --- a/testtools/util.go +++ b/testtools/util.go @@ -18,7 +18,6 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/wal-g/wal-g/internal" - "github.com/wal-g/wal-g/internal/fsutil" "github.com/wal-g/wal-g/internal/walparser" "github.com/wal-g/wal-g/pkg/storages/memory" "github.com/wal-g/wal-g/pkg/storages/s3" @@ -54,7 +53,7 @@ func NewMockUploader(apiMultiErr, apiErr bool) *internal.Uploader { ) } -func NewStoringMockUploader(storage *memory.Storage, deltaDataFolder fsutil.DataFolder) *internal.Uploader { +func NewStoringMockUploader(storage *memory.Storage) *internal.Uploader { return internal.NewUploader( &MockCompressor{}, memory.NewFolder("in_memory/", storage),