Skip to content

Commit

Permalink
FMWK-570-backup-restore-state
Browse files Browse the repository at this point in the history
- cleaning the code
  • Loading branch information
filkeith committed Oct 10, 2024
1 parent a11e874 commit 2d87705
Show file tree
Hide file tree
Showing 30 changed files with 318 additions and 197 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func main() {

// For backup to single file use local.WithFile(fileName)
writers, err := local.NewWriter(
context.Background(),
local.WithRemoveFiles(),
local.WithDir("backups_folder"),
)
Expand All @@ -52,10 +53,11 @@ func main() {

backupCfg := backup.NewDefaultBackupConfig()
backupCfg.Namespace = "test"
backupCfg.Parallel = 5
backupCfg.ParallelRead = 10
backupCfg.ParallelWrite = 10
ctx := context.Background()

backupHandler, err := backupClient.Backup(ctx, backupCfg, writers)
backupHandler, err := backupClient.Backup(ctx, backupCfg, writers, nil)
if err != nil {
panic(err)
}
Expand Down
1 change: 1 addition & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func (c *Client) Backup(
if err != nil {
return nil, fmt.Errorf("failed to create backup handler: %w", err)
}

handler.run()

return handler, nil
Expand Down
15 changes: 14 additions & 1 deletion cmd/internal/app/asbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type ASBackup struct {
backupClient *backup.Client
backupConfig *backup.BackupConfig
writer backup.Writer
// reader is used to read state file.
reader backup.StreamingReader
// Additional params.
isEstimate bool
estimatesSamples int64
Expand Down Expand Up @@ -60,6 +62,7 @@ func NewASBackup(
// Initializations.
var (
writer backup.Writer
reader backup.StreamingReader
err error
)
// We initialize a writer only if output is configured.
Expand All @@ -75,6 +78,15 @@ func NewASBackup(
}
}

if backupParams.StateFileDst != "" || backupParams.Continue != "" {
r := &models.Restore{InputFile: backupParams.OutputFile}

reader, err = getReader(ctx, r, commonParams, awsS3, gcpStorage, azureBlob)
if err != nil {
return nil, fmt.Errorf("failed to create reader: %w", err)
}
}

aerospikeClient, err := newAerospikeClient(clientConfig, backupParams.PreferRacks)
if err != nil {
return nil, fmt.Errorf("failed to create aerospike client: %w", err)
Expand All @@ -100,6 +112,7 @@ func NewASBackup(
backupClient: backupClient,
backupConfig: backupConfig,
writer: writer,
reader: reader,
isEstimate: backupParams.Estimate,
estimatesSamples: backupParams.EstimateSamples,
}, nil
Expand All @@ -120,7 +133,7 @@ func (b *ASBackup) Run(ctx context.Context) error {

printEstimateReport(estimates)
default:
h, err := b.backupClient.Backup(ctx, b.backupConfig, b.writer)
h, err := b.backupClient.Backup(ctx, b.backupConfig, b.writer, b.reader)
if err != nil {
return fmt.Errorf("failed to start backup: %w", err)
}
Expand Down
10 changes: 7 additions & 3 deletions cmd/internal/app/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,16 @@ func mapBackupConfig(
c.ParallelWrite = commonParams.Parallel
c.ParallelRead = commonParams.Parallel
// As we set --nice in MiB we must convert it to bytes
// TODO: make Bandwidth int64 to avoid overflow.
c.Bandwidth = commonParams.Nice * 1024 * 1024
c.Compact = backupParams.Compact
c.NoTTLOnly = backupParams.NoTTLOnly
c.StateFileDumpDuration = time.Duration(backupParams.StateFileDumpDuration) * time.Millisecond
c.StateFile = backupParams.StateFileDst

if backupParams.Continue != "" {
c.StateFile = backupParams.Continue
c.Continue = true
}

// Overwrite partitions if we use nodes.
if backupParams.ParallelNodes || backupParams.NodeList != "" {
Expand Down Expand Up @@ -135,7 +141,6 @@ func mapRestoreConfig(
c.WritePolicy = mapWritePolicy(restoreParams, commonParams)
c.InfoPolicy = mapInfoPolicy(restoreParams.TimeOut)
// As we set --nice in MiB we must convert it to bytes
// TODO: make Bandwidth int64 to avoid overflow.
c.Bandwidth = commonParams.Nice * 1024 * 1024
c.ExtraTTL = restoreParams.ExtraTTL
c.IgnoreRecordError = restoreParams.IgnoreRecordError
Expand Down Expand Up @@ -288,7 +293,6 @@ func recordExistsAction(replace, unique bool) aerospike.RecordExistsAction {
}
}

// TODO: why no info policy timeout is set for backup in C tool?
func mapInfoPolicy(timeOut int64) *aerospike.InfoPolicy {
p := aerospike.NewInfoPolicy()
p.Timeout = time.Duration(timeOut) * time.Millisecond
Expand Down
14 changes: 14 additions & 0 deletions cmd/internal/flags/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,20 @@ func (f *Backup) NewFlagSet() *pflag.FlagSet {
flagSet.Int64Var(&f.EstimateSamples, "estimate-samples",
10000,
"The number of samples to take when running a backup estimate.")
flagSet.StringVarP(&f.Continue, "continue", "c",
"",
"Resumes an interrupted/failed backup from where it was left off, given the .state file\n"+
"that was generated from the interrupted/failed run.")
flagSet.StringVar(&f.StateFileDst, "state-file-dst",
"",
"Either a path with a file name or a directory in which the backup state file will be\n"+
"placed if the backup is interrupted/fails. If a path with a file name is used, that\n"+
"exact path is where the backup file will be placed. If a directory is given, the backup\n"+
"state will be placed in the directory with name `<namespace>.asb.state`, or\n"+
"`<prefix>.asb.state` if `--output-file-prefix` is given.")
flagSet.Int64Var(&f.StateFileDumpDuration, "state-file-dump-duration",
10000,
"Intervals in milliseconds, how often dump state file to disk.")

return flagSet
}
Expand Down
41 changes: 22 additions & 19 deletions cmd/internal/models/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,28 @@
package models

type Backup struct {
OutputFile string
RemoveFiles bool
ModifiedBefore string
ModifiedAfter string
FileLimit int64
AfterDigest string
MaxRecords int64
NoBins bool
SleepBetweenRetries int
FilterExpression string
ParallelNodes bool
RemoveArtifacts bool
Compact bool
NodeList string
NoTTLOnly bool
PreferRacks string
PartitionList string
Estimate bool
EstimateSamples int64
OutputFile string
RemoveFiles bool
ModifiedBefore string
ModifiedAfter string
FileLimit int64
AfterDigest string
MaxRecords int64
NoBins bool
SleepBetweenRetries int
FilterExpression string
ParallelNodes bool
RemoveArtifacts bool
Compact bool
NodeList string
NoTTLOnly bool
PreferRacks string
PartitionList string
Estimate bool
EstimateSamples int64
StateFileDst string
StateFileDumpDuration int64
Continue string
}

// ShouldClearTarget check if we should clean target directory.
Expand Down
4 changes: 2 additions & 2 deletions config_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,12 @@ func (c *BackupConfig) isDefaultPartitionFilter() bool {

// isStateFirstRun checks if it is first run of backup with a state file.
func (c *BackupConfig) isStateFirstRun() bool {
return c.StateFile != "" && c.Continue == false
return c.StateFile != "" && !c.Continue
}

// isStateContinueRun checks if we continue backup from a state file.
func (c *BackupConfig) isStateContinue() bool {
return c.StateFile != "" && c.Continue == true
return c.StateFile != "" && !c.Continue
}

func (c *BackupConfig) isFullBackup() bool {
Expand Down
2 changes: 1 addition & 1 deletion examples/aws/s3/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func runBackup(ctx context.Context, c *backup.Client) {
// set compression policy
backupCfg.CompressionPolicy = backup.NewCompressionPolicy(backup.CompressZSTD, 20)

backupHandler, err := c.Backup(ctx, backupCfg, writers)
backupHandler, err := c.Backup(ctx, backupCfg, writers, nil)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion examples/azure/blob/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func runBackup(ctx context.Context, c *backup.Client) {
// set compression policy
backupCfg.CompressionPolicy = backup.NewCompressionPolicy(backup.CompressZSTD, 20)

backupHandler, err := c.Backup(ctx, backupCfg, writers)
backupHandler, err := c.Backup(ctx, backupCfg, writers, nil)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion examples/gcp/storage/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func runBackup(ctx context.Context, c *backup.Client) {
// set compression policy
backupCfg.CompressionPolicy = backup.NewCompressionPolicy(backup.CompressZSTD, 20)

backupHandler, err := c.Backup(ctx, backupCfg, writers)
backupHandler, err := c.Backup(ctx, backupCfg, writers, nil)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion examples/readme/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func main() {
backupCfg.Namespace = "test"
backupCfg.ParallelRead = 5

backupHandler, err := backupClient.Backup(ctx, backupCfg, writers)
backupHandler, err := backupClient.Backup(ctx, backupCfg, writers, nil)
if err != nil {
panic(err)
}
Expand Down
17 changes: 13 additions & 4 deletions handler_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,18 @@ func newBackupHandler(
// redefine context cancel.
ctx, cancel := context.WithCancel(ctx)

// Keep in mind, that on continue operation, we update partitions list in config by pointer.
state, err := NewState(ctx, config, reader, writer, logger)
if err != nil {
return nil, err
var (
state *State
err error
)

if config.StateFile != "" {
// Keep in mind, that on continue operation, we update partitions list in config by pointer.
state, err = NewState(ctx, config, reader, writer, logger)
if err != nil {
cancel()
return nil, err
}
}

return &BackupHandler{
Expand Down Expand Up @@ -243,6 +251,7 @@ func (bh *BackupHandler) backupSync(ctx context.Context) error {
if err != nil {
return err
}

if bh.config.isStateContinue() {
// Have to reload filter, as on count records cursor is moving and future scans returns nothing.
bh.config.PartitionFilters, err = bh.state.loadPartitionFilters()
Expand Down
5 changes: 3 additions & 2 deletions handler_backup_records.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ type backupRecordsHandler struct {
aerospikeClient AerospikeClient
logger *slog.Logger
scanLimiter *semaphore.Weighted
// is used when AfterDigest is set.
afterDigest []byte
}

func newBackupRecordsHandler(
Expand Down Expand Up @@ -204,7 +202,9 @@ func (bh *backupRecordsHandler) makeAerospikeReadWorkersForPartition(
ctx context.Context, n int, scanPolicy *a.ScanPolicy,
) ([]pipeline.Worker[*models.Token], error) {
var err error

partitionGroups := bh.config.PartitionFilters

if !bh.config.isStateContinue() {
partitionGroups, err = splitPartitions(bh.config.PartitionFilters, n)
if err != nil {
Expand Down Expand Up @@ -276,6 +276,7 @@ func (bh *backupRecordsHandler) recordReaderConfigForPartitions(
scanPolicy *a.ScanPolicy,
) *aerospike.RecordReaderConfig {
pfCopy := *partitionFilter

return aerospike.NewRecordReaderConfig(
bh.config.Namespace,
bh.config.SetList,
Expand Down
8 changes: 4 additions & 4 deletions internal/processors/change_namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestChangeNamespaceProcessor(t *testing.T) {
Record: &aerospike.Record{
Key: key,
},
}, 0),
}, 0, nil),
wantErr: false,
},
{
Expand All @@ -56,7 +56,7 @@ func TestChangeNamespaceProcessor(t *testing.T) {
Record: &aerospike.Record{
Key: key,
},
}, 0),
}, 0, nil),
wantErr: false,
},
{
Expand All @@ -74,7 +74,7 @@ func TestChangeNamespaceProcessor(t *testing.T) {
Record: &aerospike.Record{
Key: invalidKey,
},
}, 0),
}, 0, nil),
wantErr: true,
},
{
Expand All @@ -85,7 +85,7 @@ func TestChangeNamespaceProcessor(t *testing.T) {
Record: &aerospike.Record{
Key: key,
},
}, 0),
}, 0, nil),
wantErr: false,
},
}
Expand Down
9 changes: 7 additions & 2 deletions io/aerospike/record_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,13 @@ func (r *RecordReader) Read() (*models.Token, error) {
rec := models.Record{
Record: res.Record,
}
// TODO: check how accurate is value of filter at this moment.
recToken := models.NewRecordToken(&rec, 0, *r.config.partitionFilter)

pfs, err := models.NewPartitionFilterSerialized(r.config.partitionFilter)
if err != nil {
return nil, fmt.Errorf("failed to serialize partition filter: %w", err)
}

recToken := models.NewRecordToken(&rec, 0, pfs)

return recToken, nil
}
Expand Down
4 changes: 2 additions & 2 deletions io/aerospike/record_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (suite *readersTestSuite) TestAerospikeRecordReader() {

v, err := reader.Read()
suite.Nil(err)
expectedRecToken := models.NewRecordToken(mockRec, 0)
expectedRecToken := models.NewRecordToken(mockRec, 0, nil)
suite.Equal(expectedRecToken, v)
mockScanner.AssertExpectations(suite.T())
}
Expand Down Expand Up @@ -273,7 +273,7 @@ func (suite *readersTestSuite) TestAerospikeRecordReaderWithPolicy() {

v, err := reader.Read()
suite.Nil(err)
expectedRecToken := models.NewRecordToken(mockRec, 0)
expectedRecToken := models.NewRecordToken(mockRec, 0, nil)
suite.Equal(expectedRecToken, v)
mockScanner.AssertExpectations(suite.T())
}
Expand Down
2 changes: 1 addition & 1 deletion io/encoding/asb/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (r *Decoder) NextToken() (*models.Token, error) {
case *models.UDF:
return models.NewUDFToken(v, size), nil
case *models.Record:
return models.NewRecordToken(v, size, a.PartitionFilter{}), nil
return models.NewRecordToken(v, size, nil), nil
default:
return nil, fmt.Errorf("unsupported token type %T", v)
}
Expand Down
2 changes: 1 addition & 1 deletion io/encoding/asb/decode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3718,7 +3718,7 @@ func TestASBReader_NextToken(t *testing.T) {
Generation: 10,
},
VoidTime: 10,
}, 106),
}, 106, nil),
},
{
name: "negative EOF",
Expand Down
Loading

0 comments on commit 2d87705

Please sign in to comment.