Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FMWK-570 Add backup state/continuation for backup/restore operation #158

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func main() {

// For backup to single file use local.WithFile(fileName)
writers, err := local.NewWriter(
context.Background(),
local.WithRemoveFiles(),
local.WithDir("backups_folder"),
)
Expand All @@ -52,10 +53,11 @@ func main() {

backupCfg := backup.NewDefaultBackupConfig()
backupCfg.Namespace = "test"
backupCfg.Parallel = 5
backupCfg.ParallelRead = 10
backupCfg.ParallelWrite = 10
ctx := context.Background()

backupHandler, err := backupClient.Backup(ctx, backupCfg, writers)
backupHandler, err := backupClient.Backup(ctx, backupCfg, writers, nil)
if err != nil {
panic(err)
}
Expand Down
13 changes: 11 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,12 @@ func (c *Client) getUsableScanPolicy(p *a.ScanPolicy) *a.ScanPolicy {
// - ctx can be used to cancel the backup operation.
// - config is the configuration for the backup operation.
// - writer creates new writers for the backup operation.
// - reader is used only for reading a state file for continuation operations.
func (c *Client) Backup(
ctx context.Context,
config *BackupConfig,
writer Writer,
reader StreamingReader,
) (*BackupHandler, error) {
if config == nil {
return nil, fmt.Errorf("backup config required")
Expand All @@ -212,7 +214,11 @@ func (c *Client) Backup(
return nil, fmt.Errorf("failed to validate backup config: %w", err)
}

handler := newBackupHandler(ctx, config, c.aerospikeClient, c.logger, writer, c.scanLimiter)
handler, err := newBackupHandler(ctx, config, c.aerospikeClient, c.logger, writer, reader, c.scanLimiter)
if err != nil {
return nil, fmt.Errorf("failed to create backup handler: %w", err)
}

handler.run()

return handler, nil
Expand Down Expand Up @@ -271,7 +277,10 @@ func (c *Client) Estimate(
return 0, fmt.Errorf("failed to validate backup config: %w", err)
}

handler := newBackupHandler(ctx, config, c.aerospikeClient, c.logger, nil, c.scanLimiter)
handler, err := newBackupHandler(ctx, config, c.aerospikeClient, c.logger, nil, nil, c.scanLimiter)
if err != nil {
return 0, fmt.Errorf("failed to create backup handler: %w", err)
}

result, err := handler.getEstimate(ctx, estimateSamples)
if err != nil {
Expand Down
22 changes: 13 additions & 9 deletions cmd/asbackup/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ More info here https://goreleaser.com/quick-start/

## Supported flags
```
Welcome to the Aerospike backup CLI tool!
-----------------------------------------

Comment on lines +27 to +29
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we add this line to the usage output?

Usage:
asbackup [flags]

Expand Down Expand Up @@ -71,6 +74,7 @@ Backup Flags:
there is no socket idle time limit (default 10000)
-N, --nice int The limits for read/write storage bandwidth in MiB/s
-o, --output-file string Backup to a single backup file. Use - for stdout. Required, unless -d or -e is used.
-q, --output-file-prefix string When using directory parameter, prepend a prefix to the names of the generated files.
-r, --remove-files Remove existing backup file (-o) or files (-d).
-F, --file-limit int Rotate backup files, when their size crosses the given
value (in bytes) Only used when backing up to a Directory.
Expand Down Expand Up @@ -127,6 +131,15 @@ Backup Flags:
after-digest, partition-list.
It calculates estimate size of full backup.
--estimate-samples int The number of samples to take when running a backup estimate. (default 10000)
-c, --continue string Resumes an interrupted/failed backup from where it was left off, given the .state file
that was generated from the interrupted/failed run.
--state-file-dst string Name of a state file that will be saved in backup --directory.
Works only with --file-limit parameter. As we reach --file-limit and close file,
current state will be saved. Works only for default and/or partition backup.
Not work with --parallel-nodes or --node--list.
--scan-page-size int How many records will be read on one iteration for continuation backup.
Affects size if overlap on resuming backup after an error.
Is used only with --state-file-dst or --continue. (default 10000)

Compression Flags:
-z, --compress string Enables compressing of backup files using the specified compression algorithm.
Expand Down Expand Up @@ -173,15 +186,6 @@ Azure Flags:

## Unsupported flags
```
--continue Resumes an interrupted/failed backup from where it was left off, given the .state file
that was generated from the interrupted/failed run.

--state-file-dst Either a path with a file name or a directory in which the backup state file will be
placed if the backup is interrupted/fails. If a path with a file name is used, that
exact path is where the backup file will be placed. If a directory is given, the backup
state will be placed in the directory with name `<namespace>.asb.state`, or
`<prefix>.asb.state` if `--output-file-prefix` is given.

--machine Output machine-readable status updates to the given path, typically a FIFO.

--no-config-file Do not read any config file. Default: disabled
Expand Down
24 changes: 22 additions & 2 deletions cmd/internal/app/asbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type ASBackup struct {
backupClient *backup.Client
backupConfig *backup.BackupConfig
writer backup.Writer
// reader is used to read state file.
reader backup.StreamingReader
// Additional params.
isEstimate bool
estimatesSamples int64
Expand Down Expand Up @@ -60,11 +62,19 @@ func NewASBackup(
// Initializations.
var (
writer backup.Writer
reader backup.StreamingReader
err error
)
// We initialize a writer only if output is configured.
if backupParams.OutputFile != "" || commonParams.Directory != "" {
writer, err = getWriter(ctx, backupParams, commonParams, awsS3, gcpStorage, azureBlob)
writer, err = getWriter(
ctx,
backupParams,
commonParams,
awsS3,
gcpStorage,
azureBlob,
)
if err != nil {
return nil, fmt.Errorf("failed to create backup writer: %w", err)
}
Expand All @@ -75,6 +85,15 @@ func NewASBackup(
}
}

if backupParams.ShouldSaveState() {
r := &models.Restore{InputFile: backupParams.OutputFile}

reader, err = getReader(ctx, r, commonParams, awsS3, gcpStorage, azureBlob, backupParams)
if err != nil {
return nil, fmt.Errorf("failed to create reader: %w", err)
}
}

aerospikeClient, err := newAerospikeClient(clientConfig, backupParams.PreferRacks)
if err != nil {
return nil, fmt.Errorf("failed to create aerospike client: %w", err)
Expand All @@ -100,6 +119,7 @@ func NewASBackup(
backupClient: backupClient,
backupConfig: backupConfig,
writer: writer,
reader: reader,
isEstimate: backupParams.Estimate,
estimatesSamples: backupParams.EstimateSamples,
}, nil
Expand All @@ -120,7 +140,7 @@ func (b *ASBackup) Run(ctx context.Context) error {

printEstimateReport(estimates)
default:
h, err := b.backupClient.Backup(ctx, b.backupConfig, b.writer)
h, err := b.backupClient.Backup(ctx, b.backupConfig, b.writer, b.reader)
if err != nil {
return fmt.Errorf("failed to start backup: %w", err)
}
Expand Down
11 changes: 6 additions & 5 deletions cmd/internal/app/asrestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func NewASRestore(
return nil, err
}

reader, err := getReader(ctx, restoreParams, commonParams, awsS3, gcpStorage, azureBlob)
reader, err := getReader(ctx, restoreParams, commonParams, awsS3, gcpStorage, azureBlob, nil)
if err != nil {
return nil, fmt.Errorf("failed to create backup reader: %w", err)
}
Expand Down Expand Up @@ -108,15 +108,16 @@ func getReader(
awsS3 *models.AwsS3,
gcpStorage *models.GcpStorage,
azureBlob *models.AzureBlob,
backupParams *models.Backup,
) (backup.StreamingReader, error) {
switch {
case awsS3.Region != "":
return newS3Reader(ctx, awsS3, restoreParams, commonParams)
return newS3Reader(ctx, awsS3, restoreParams, commonParams, backupParams)
case gcpStorage.BucketName != "":
return newGcpReader(ctx, gcpStorage, restoreParams, commonParams)
return newGcpReader(ctx, gcpStorage, restoreParams, commonParams, backupParams)
case azureBlob.ContainerName != "":
return newAzureReader(ctx, azureBlob, restoreParams, commonParams)
return newAzureReader(ctx, azureBlob, restoreParams, commonParams, backupParams)
default:
return newLocalReader(restoreParams, commonParams)
return newLocalReader(restoreParams, commonParams, backupParams)
}
}
17 changes: 14 additions & 3 deletions cmd/internal/app/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,23 @@ func mapBackupConfig(
c.ParallelWrite = commonParams.Parallel
c.ParallelRead = commonParams.Parallel
// As we set --nice in MiB we must convert it to bytes
// TODO: make Bandwidth int64 to avoid overflow.
c.Bandwidth = commonParams.Nice * 1024 * 1024
c.Compact = backupParams.Compact
c.NoTTLOnly = backupParams.NoTTLOnly
c.OutputFilePrefix = backupParams.OutputFilePrefix

if backupParams.Continue != "" {
c.StateFile = backupParams.Continue
c.Continue = true
c.SyncPipelines = true
c.PageSize = backupParams.ScanPageSize
}

if backupParams.StateFileDst != "" {
c.StateFile = backupParams.StateFileDst
c.SyncPipelines = true
c.PageSize = backupParams.ScanPageSize
}

// Overwrite partitions if we use nodes.
if backupParams.ParallelNodes || backupParams.NodeList != "" {
Expand Down Expand Up @@ -135,7 +148,6 @@ func mapRestoreConfig(
c.WritePolicy = mapWritePolicy(restoreParams, commonParams)
c.InfoPolicy = mapInfoPolicy(restoreParams.TimeOut)
// As we set --nice in MiB we must convert it to bytes
// TODO: make Bandwidth int64 to avoid overflow.
c.Bandwidth = commonParams.Nice * 1024 * 1024
c.ExtraTTL = restoreParams.ExtraTTL
c.IgnoreRecordError = restoreParams.IgnoreRecordError
Expand Down Expand Up @@ -288,7 +300,6 @@ func recordExistsAction(replace, unique bool) aerospike.RecordExistsAction {
}
}

// TODO: why no info policy timeout is set for backup in C tool?
func mapInfoPolicy(timeOut int64) *aerospike.InfoPolicy {
p := aerospike.NewInfoPolicy()
p.Timeout = time.Duration(timeOut) * time.Millisecond
Expand Down
33 changes: 28 additions & 5 deletions cmd/internal/app/readers.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,16 @@ import (
"github.com/aerospike/backup-go/io/local"
)

func newLocalReader(r *models.Restore, c *models.Common) (backup.StreamingReader, error) {
func newLocalReader(r *models.Restore, c *models.Common, b *models.Backup) (backup.StreamingReader, error) {
var opts []local.Opt

if c.Directory != "" && r.InputFile == "" {
opts = append(opts, local.WithDir(c.Directory), local.WithValidator(asb.NewValidator()))
opts = append(opts, local.WithDir(c.Directory))
// Append Validator only if backup params are not set.
// That means we don't need to check that we are saving a state file.
if b == nil {
opts = append(opts, local.WithValidator(asb.NewValidator()))
}
Comment on lines +34 to +38
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider taking this block to a method and reuse.

}

if r.InputFile != "" && c.Directory == "" {
Expand All @@ -45,6 +50,7 @@ func newS3Reader(
a *models.AwsS3,
r *models.Restore,
c *models.Common,
b *models.Backup,
) (backup.StreamingReader, error) {
client, err := newS3Client(ctx, a)
if err != nil {
Expand All @@ -57,7 +63,12 @@ func newS3Reader(

if c.Directory != "" && r.InputFile == "" {
bucketName, path = getBucketFromPath(c.Directory)
opts = append(opts, s3.WithDir(path), s3.WithValidator(asb.NewValidator()))
opts = append(opts, s3.WithDir(path))
// Append Validator only if backup params are not set.
// That means we don't need to check that we are saving a state file.
if b == nil {
opts = append(opts, s3.WithValidator(asb.NewValidator()))
}
}

if r.InputFile != "" && c.Directory == "" {
Expand All @@ -73,6 +84,7 @@ func newGcpReader(
g *models.GcpStorage,
r *models.Restore,
c *models.Common,
b *models.Backup,
) (backup.StreamingReader, error) {
client, err := newGcpClient(ctx, g)
if err != nil {
Expand All @@ -82,7 +94,12 @@ func newGcpReader(
opts := make([]storage.Opt, 0)

if c.Directory != "" && r.InputFile == "" {
opts = append(opts, storage.WithDir(c.Directory), storage.WithValidator(asb.NewValidator()))
opts = append(opts, storage.WithDir(c.Directory))
// Append Validator only if backup params are not set.
// That means we don't need to check that we are saving a state file.
if b == nil {
opts = append(opts, storage.WithValidator(asb.NewValidator()))
}
}

if r.InputFile != "" && c.Directory == "" {
Expand All @@ -97,6 +114,7 @@ func newAzureReader(
a *models.AzureBlob,
r *models.Restore,
c *models.Common,
b *models.Backup,
) (backup.StreamingReader, error) {
client, err := newAzureClient(a)
if err != nil {
Expand All @@ -106,7 +124,12 @@ func newAzureReader(
opts := make([]blob.Opt, 0)

if c.Directory != "" && r.InputFile == "" {
opts = append(opts, blob.WithDir(c.Directory), blob.WithValidator(asb.NewValidator()))
opts = append(opts, blob.WithDir(c.Directory))
// Append Validator only if backup params are not set.
// That means we don't need to check that we are saving a state file.
if b == nil {
opts = append(opts, blob.WithValidator(asb.NewValidator()))
}
}

if r.InputFile != "" && c.Directory == "" {
Expand Down
Loading
Loading