diff --git a/README.md b/README.md index 39640280..057f9c6c 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,7 @@ func main() { // For backup to single file use local.WithFile(fileName) writers, err := local.NewWriter( + context.Background(), local.WithRemoveFiles(), local.WithDir("backups_folder"), ) @@ -52,10 +53,11 @@ func main() { backupCfg := backup.NewDefaultBackupConfig() backupCfg.Namespace = "test" - backupCfg.Parallel = 5 + backupCfg.ParallelRead = 10 + backupCfg.ParallelWrite = 10 ctx := context.Background() - backupHandler, err := backupClient.Backup(ctx, backupCfg, writers) + backupHandler, err := backupClient.Backup(ctx, backupCfg, writers, nil) if err != nil { panic(err) } diff --git a/client.go b/client.go index b100af8f..c8723ebe 100644 --- a/client.go +++ b/client.go @@ -195,10 +195,12 @@ func (c *Client) getUsableScanPolicy(p *a.ScanPolicy) *a.ScanPolicy { // - ctx can be used to cancel the backup operation. // - config is the configuration for the backup operation. // - writer creates new writers for the backup operation. +// - reader is used only for reading a state file for continuation operations. func (c *Client) Backup( ctx context.Context, config *BackupConfig, writer Writer, + reader StreamingReader, ) (*BackupHandler, error) { if config == nil { return nil, fmt.Errorf("backup config required") @@ -212,7 +214,11 @@ func (c *Client) Backup( return nil, fmt.Errorf("failed to validate backup config: %w", err) } - handler := newBackupHandler(ctx, config, c.aerospikeClient, c.logger, writer, c.scanLimiter) + handler, err := newBackupHandler(ctx, config, c.aerospikeClient, c.logger, writer, reader, c.scanLimiter) + if err != nil { + return nil, fmt.Errorf("failed to create backup handler: %w", err) + } + handler.run() return handler, nil @@ -271,7 +277,10 @@ func (c *Client) Estimate( return 0, fmt.Errorf("failed to validate backup config: %w", err) } - handler := newBackupHandler(ctx, config, c.aerospikeClient, c.logger, nil, c.scanLimiter) + handler, err := newBackupHandler(ctx, config, c.aerospikeClient, c.logger, nil, nil, c.scanLimiter) + if err != nil { + return 0, fmt.Errorf("failed to create backup handler: %w", err) + } result, err := handler.getEstimate(ctx, estimateSamples) if err != nil { diff --git a/cmd/asbackup/readme.md b/cmd/asbackup/readme.md index 22b36538..db7a77fc 100644 --- a/cmd/asbackup/readme.md +++ b/cmd/asbackup/readme.md @@ -24,6 +24,9 @@ More info here https://goreleaser.com/quick-start/ ## Supported flags ``` +Welcome to the Aerospike backup CLI tool! +----------------------------------------- + Usage: asbackup [flags] @@ -71,6 +74,7 @@ Backup Flags: there is no socket idle time limit (default 10000) -N, --nice int The limits for read/write storage bandwidth in MiB/s -o, --output-file string Backup to a single backup file. Use - for stdout. Required, unless -d or -e is used. + -q, --output-file-prefix string When using directory parameter, prepend a prefix to the names of the generated files. -r, --remove-files Remove existing backup file (-o) or files (-d). -F, --file-limit int Rotate backup files, when their size crosses the given value (in bytes) Only used when backing up to a Directory. @@ -127,6 +131,15 @@ Backup Flags: after-digest, partition-list. It calculates estimate size of full backup. --estimate-samples int The number of samples to take when running a backup estimate. (default 10000) + -c, --continue string Resumes an interrupted/failed backup from where it was left off, given the .state file + that was generated from the interrupted/failed run. + --state-file-dst string Name of a state file that will be saved in backup --directory. + Works only with --file-limit parameter. As we reach --file-limit and close file, + current state will be saved. Works only for default and/or partition backup. + Not work with --parallel-nodes or --node--list. + --scan-page-size int How many records will be read on one iteration for continuation backup. + Affects size if overlap on resuming backup after an error. + Is used only with --state-file-dst or --continue. (default 10000) Compression Flags: -z, --compress string Enables compressing of backup files using the specified compression algorithm. @@ -173,15 +186,6 @@ Azure Flags: ## Unsupported flags ``` ---continue Resumes an interrupted/failed backup from where it was left off, given the .state file - that was generated from the interrupted/failed run. - ---state-file-dst Either a path with a file name or a directory in which the backup state file will be - placed if the backup is interrupted/fails. If a path with a file name is used, that - exact path is where the backup file will be placed. If a directory is given, the backup - state will be placed in the directory with name `.asb.state`, or - `.asb.state` if `--output-file-prefix` is given. - --machine Output machine-readable status updates to the given path, typically a FIFO. --no-config-file Do not read any config file. Default: disabled diff --git a/cmd/internal/app/asbackup.go b/cmd/internal/app/asbackup.go index 87a91418..e3a6c89d 100644 --- a/cmd/internal/app/asbackup.go +++ b/cmd/internal/app/asbackup.go @@ -30,6 +30,8 @@ type ASBackup struct { backupClient *backup.Client backupConfig *backup.BackupConfig writer backup.Writer + // reader is used to read state file. + reader backup.StreamingReader // Additional params. isEstimate bool estimatesSamples int64 @@ -60,11 +62,19 @@ func NewASBackup( // Initializations. var ( writer backup.Writer + reader backup.StreamingReader err error ) // We initialize a writer only if output is configured. if backupParams.OutputFile != "" || commonParams.Directory != "" { - writer, err = getWriter(ctx, backupParams, commonParams, awsS3, gcpStorage, azureBlob) + writer, err = getWriter( + ctx, + backupParams, + commonParams, + awsS3, + gcpStorage, + azureBlob, + ) if err != nil { return nil, fmt.Errorf("failed to create backup writer: %w", err) } @@ -75,6 +85,15 @@ func NewASBackup( } } + if backupParams.ShouldSaveState() { + r := &models.Restore{InputFile: backupParams.OutputFile} + + reader, err = getReader(ctx, r, commonParams, awsS3, gcpStorage, azureBlob, backupParams) + if err != nil { + return nil, fmt.Errorf("failed to create reader: %w", err) + } + } + aerospikeClient, err := newAerospikeClient(clientConfig, backupParams.PreferRacks) if err != nil { return nil, fmt.Errorf("failed to create aerospike client: %w", err) @@ -100,6 +119,7 @@ func NewASBackup( backupClient: backupClient, backupConfig: backupConfig, writer: writer, + reader: reader, isEstimate: backupParams.Estimate, estimatesSamples: backupParams.EstimateSamples, }, nil @@ -120,7 +140,7 @@ func (b *ASBackup) Run(ctx context.Context) error { printEstimateReport(estimates) default: - h, err := b.backupClient.Backup(ctx, b.backupConfig, b.writer) + h, err := b.backupClient.Backup(ctx, b.backupConfig, b.writer, b.reader) if err != nil { return fmt.Errorf("failed to start backup: %w", err) } diff --git a/cmd/internal/app/asrestore.go b/cmd/internal/app/asrestore.go index 2bf1e1ef..3e7c36e5 100644 --- a/cmd/internal/app/asrestore.go +++ b/cmd/internal/app/asrestore.go @@ -49,7 +49,7 @@ func NewASRestore( return nil, err } - reader, err := getReader(ctx, restoreParams, commonParams, awsS3, gcpStorage, azureBlob) + reader, err := getReader(ctx, restoreParams, commonParams, awsS3, gcpStorage, azureBlob, nil) if err != nil { return nil, fmt.Errorf("failed to create backup reader: %w", err) } @@ -108,15 +108,16 @@ func getReader( awsS3 *models.AwsS3, gcpStorage *models.GcpStorage, azureBlob *models.AzureBlob, + backupParams *models.Backup, ) (backup.StreamingReader, error) { switch { case awsS3.Region != "": - return newS3Reader(ctx, awsS3, restoreParams, commonParams) + return newS3Reader(ctx, awsS3, restoreParams, commonParams, backupParams) case gcpStorage.BucketName != "": - return newGcpReader(ctx, gcpStorage, restoreParams, commonParams) + return newGcpReader(ctx, gcpStorage, restoreParams, commonParams, backupParams) case azureBlob.ContainerName != "": - return newAzureReader(ctx, azureBlob, restoreParams, commonParams) + return newAzureReader(ctx, azureBlob, restoreParams, commonParams, backupParams) default: - return newLocalReader(restoreParams, commonParams) + return newLocalReader(restoreParams, commonParams, backupParams) } } diff --git a/cmd/internal/app/configs.go b/cmd/internal/app/configs.go index 93800065..6bdc9fe5 100644 --- a/cmd/internal/app/configs.go +++ b/cmd/internal/app/configs.go @@ -59,10 +59,23 @@ func mapBackupConfig( c.ParallelWrite = commonParams.Parallel c.ParallelRead = commonParams.Parallel // As we set --nice in MiB we must convert it to bytes - // TODO: make Bandwidth int64 to avoid overflow. c.Bandwidth = commonParams.Nice * 1024 * 1024 c.Compact = backupParams.Compact c.NoTTLOnly = backupParams.NoTTLOnly + c.OutputFilePrefix = backupParams.OutputFilePrefix + + if backupParams.Continue != "" { + c.StateFile = backupParams.Continue + c.Continue = true + c.SyncPipelines = true + c.PageSize = backupParams.ScanPageSize + } + + if backupParams.StateFileDst != "" { + c.StateFile = backupParams.StateFileDst + c.SyncPipelines = true + c.PageSize = backupParams.ScanPageSize + } // Overwrite partitions if we use nodes. if backupParams.ParallelNodes || backupParams.NodeList != "" { @@ -135,7 +148,6 @@ func mapRestoreConfig( c.WritePolicy = mapWritePolicy(restoreParams, commonParams) c.InfoPolicy = mapInfoPolicy(restoreParams.TimeOut) // As we set --nice in MiB we must convert it to bytes - // TODO: make Bandwidth int64 to avoid overflow. c.Bandwidth = commonParams.Nice * 1024 * 1024 c.ExtraTTL = restoreParams.ExtraTTL c.IgnoreRecordError = restoreParams.IgnoreRecordError @@ -288,7 +300,6 @@ func recordExistsAction(replace, unique bool) aerospike.RecordExistsAction { } } -// TODO: why no info policy timeout is set for backup in C tool? func mapInfoPolicy(timeOut int64) *aerospike.InfoPolicy { p := aerospike.NewInfoPolicy() p.Timeout = time.Duration(timeOut) * time.Millisecond diff --git a/cmd/internal/app/readers.go b/cmd/internal/app/readers.go index 5e53e96e..b65ca5a7 100644 --- a/cmd/internal/app/readers.go +++ b/cmd/internal/app/readers.go @@ -26,11 +26,16 @@ import ( "github.com/aerospike/backup-go/io/local" ) -func newLocalReader(r *models.Restore, c *models.Common) (backup.StreamingReader, error) { +func newLocalReader(r *models.Restore, c *models.Common, b *models.Backup) (backup.StreamingReader, error) { var opts []local.Opt if c.Directory != "" && r.InputFile == "" { - opts = append(opts, local.WithDir(c.Directory), local.WithValidator(asb.NewValidator())) + opts = append(opts, local.WithDir(c.Directory)) + // Append Validator only if backup params are not set. + // That means we don't need to check that we are saving a state file. + if b == nil { + opts = append(opts, local.WithValidator(asb.NewValidator())) + } } if r.InputFile != "" && c.Directory == "" { @@ -45,6 +50,7 @@ func newS3Reader( a *models.AwsS3, r *models.Restore, c *models.Common, + b *models.Backup, ) (backup.StreamingReader, error) { client, err := newS3Client(ctx, a) if err != nil { @@ -57,7 +63,12 @@ func newS3Reader( if c.Directory != "" && r.InputFile == "" { bucketName, path = getBucketFromPath(c.Directory) - opts = append(opts, s3.WithDir(path), s3.WithValidator(asb.NewValidator())) + opts = append(opts, s3.WithDir(path)) + // Append Validator only if backup params are not set. + // That means we don't need to check that we are saving a state file. + if b == nil { + opts = append(opts, s3.WithValidator(asb.NewValidator())) + } } if r.InputFile != "" && c.Directory == "" { @@ -73,6 +84,7 @@ func newGcpReader( g *models.GcpStorage, r *models.Restore, c *models.Common, + b *models.Backup, ) (backup.StreamingReader, error) { client, err := newGcpClient(ctx, g) if err != nil { @@ -82,7 +94,12 @@ func newGcpReader( opts := make([]storage.Opt, 0) if c.Directory != "" && r.InputFile == "" { - opts = append(opts, storage.WithDir(c.Directory), storage.WithValidator(asb.NewValidator())) + opts = append(opts, storage.WithDir(c.Directory)) + // Append Validator only if backup params are not set. + // That means we don't need to check that we are saving a state file. + if b == nil { + opts = append(opts, storage.WithValidator(asb.NewValidator())) + } } if r.InputFile != "" && c.Directory == "" { @@ -97,6 +114,7 @@ func newAzureReader( a *models.AzureBlob, r *models.Restore, c *models.Common, + b *models.Backup, ) (backup.StreamingReader, error) { client, err := newAzureClient(a) if err != nil { @@ -106,7 +124,12 @@ func newAzureReader( opts := make([]blob.Opt, 0) if c.Directory != "" && r.InputFile == "" { - opts = append(opts, blob.WithDir(c.Directory), blob.WithValidator(asb.NewValidator())) + opts = append(opts, blob.WithDir(c.Directory)) + // Append Validator only if backup params are not set. + // That means we don't need to check that we are saving a state file. + if b == nil { + opts = append(opts, blob.WithValidator(asb.NewValidator())) + } } if r.InputFile != "" && c.Directory == "" { diff --git a/cmd/internal/app/readers_test.go b/cmd/internal/app/readers_test.go index 3435fe59..b03bcd37 100644 --- a/cmd/internal/app/readers_test.go +++ b/cmd/internal/app/readers_test.go @@ -28,8 +28,9 @@ func TestNewLocalReader(t *testing.T) { c := &models.Common{ Directory: t.TempDir(), } + b := &models.Backup{} - reader, err := newLocalReader(r, c) + reader, err := newLocalReader(r, c, b) assert.NoError(t, err) assert.NotNil(t, reader) assert.Equal(t, testLocalType, reader.GetType()) @@ -39,13 +40,13 @@ func TestNewLocalReader(t *testing.T) { } c = &models.Common{} - reader, err = newLocalReader(r, c) + reader, err = newLocalReader(r, c, b) assert.NoError(t, err) assert.NotNil(t, reader) assert.Equal(t, testLocalType, reader.GetType()) r = &models.Restore{} - reader, err = newLocalReader(r, c) + reader, err = newLocalReader(r, c, b) assert.Error(t, err) assert.Nil(t, reader) } @@ -59,6 +60,7 @@ func TestNewS3Reader(t *testing.T) { c := &models.Common{ Directory: "asbackup/" + t.TempDir(), } + b := &models.Backup{} s3cfg := &models.AwsS3{ Region: testS3Region, @@ -68,7 +70,7 @@ func TestNewS3Reader(t *testing.T) { ctx := context.Background() - writer, err := newS3Reader(ctx, s3cfg, r, c) + writer, err := newS3Reader(ctx, s3cfg, r, c, b) assert.NoError(t, err) assert.NotNil(t, writer) assert.Equal(t, testS3Type, writer.GetType()) @@ -78,7 +80,7 @@ func TestNewS3Reader(t *testing.T) { } c = &models.Common{} - writer, err = newS3Reader(ctx, s3cfg, r, c) + writer, err = newS3Reader(ctx, s3cfg, r, c, b) assert.NoError(t, err) assert.NotNil(t, writer) assert.Equal(t, testS3Type, writer.GetType()) @@ -93,6 +95,7 @@ func TestNewGcpReader(t *testing.T) { c := &models.Common{ Directory: t.TempDir(), } + b := &models.Backup{} cfg := &models.GcpStorage{ BucketName: testBucket, @@ -101,7 +104,7 @@ func TestNewGcpReader(t *testing.T) { ctx := context.Background() - writer, err := newGcpReader(ctx, cfg, r, c) + writer, err := newGcpReader(ctx, cfg, r, c, b) assert.NoError(t, err) assert.NotNil(t, writer) assert.Equal(t, testGcpType, writer.GetType()) @@ -111,7 +114,7 @@ func TestNewGcpReader(t *testing.T) { } c = &models.Common{} - writer, err = newGcpReader(ctx, cfg, r, c) + writer, err = newGcpReader(ctx, cfg, r, c, b) assert.NoError(t, err) assert.NotNil(t, writer) assert.Equal(t, testGcpType, writer.GetType()) @@ -126,6 +129,7 @@ func TestNewAzureReader(t *testing.T) { c := &models.Common{ Directory: t.TempDir(), } + b := &models.Backup{} cfg := &models.AzureBlob{ AccountName: testAzureAccountName, @@ -136,7 +140,7 @@ func TestNewAzureReader(t *testing.T) { ctx := context.Background() - writer, err := newAzureReader(ctx, cfg, r, c) + writer, err := newAzureReader(ctx, cfg, r, c, b) assert.NoError(t, err) assert.NotNil(t, writer) assert.Equal(t, testAzureType, writer.GetType()) @@ -146,7 +150,7 @@ func TestNewAzureReader(t *testing.T) { } c = &models.Common{} - writer, err = newAzureReader(ctx, cfg, r, c) + writer, err = newAzureReader(ctx, cfg, r, c, b) assert.NoError(t, err) assert.NotNil(t, writer) assert.Equal(t, testAzureType, writer.GetType()) diff --git a/cmd/internal/app/writers.go b/cmd/internal/app/writers.go index 92051dad..6f355cb3 100644 --- a/cmd/internal/app/writers.go +++ b/cmd/internal/app/writers.go @@ -42,6 +42,10 @@ func newLocalWriter(ctx context.Context, b *models.Backup, c *models.Common) (ba opts = append(opts, local.WithRemoveFiles()) } + if b.Continue != "" { + opts = append(opts, local.WithSkipDirCheck()) + } + opts = append(opts, local.WithValidator(asb.NewValidator())) return local.NewWriter(ctx, opts...) @@ -76,6 +80,10 @@ func newS3Writer( opts = append(opts, s3.WithRemoveFiles()) } + if b.Continue != "" { + opts = append(opts, s3.WithSkipDirCheck()) + } + opts = append(opts, s3.WithValidator(asb.NewValidator())) return s3.NewWriter(ctx, client, bucketName, opts...) @@ -106,6 +114,10 @@ func newGcpWriter( opts = append(opts, storage.WithRemoveFiles()) } + if b.Continue != "" { + opts = append(opts, storage.WithSkipDirCheck()) + } + opts = append(opts, storage.WithValidator(asb.NewValidator())) return storage.NewWriter(ctx, client, g.BucketName, opts...) @@ -136,6 +148,10 @@ func newAzureWriter( opts = append(opts, blob.WithRemoveFiles()) } + if b.Continue != "" { + opts = append(opts, blob.WithSkipDirCheck()) + } + opts = append(opts, blob.WithValidator(asb.NewValidator())) return blob.NewWriter(ctx, client, a.ContainerName, opts...) diff --git a/cmd/internal/flags/backup.go b/cmd/internal/flags/backup.go index 5d7fb0df..166fb8fc 100644 --- a/cmd/internal/flags/backup.go +++ b/cmd/internal/flags/backup.go @@ -33,6 +33,9 @@ func (f *Backup) NewFlagSet() *pflag.FlagSet { flagSet.StringVarP(&f.OutputFile, "output-file", "o", "", "Backup to a single backup file. Use - for stdout. Required, unless -d or -e is used.") + flagSet.StringVarP(&f.OutputFilePrefix, "output-file-prefix", "q", + "", + "When using directory parameter, prepend a prefix to the names of the generated files.") flagSet.BoolVarP(&f.RemoveFiles, "remove-files", "r", false, "Remove existing backup file (-o) or files (-d).") @@ -123,6 +126,21 @@ func (f *Backup) NewFlagSet() *pflag.FlagSet { flagSet.Int64Var(&f.EstimateSamples, "estimate-samples", 10000, "The number of samples to take when running a backup estimate.") + flagSet.StringVarP(&f.Continue, "continue", "c", + "", + "Resumes an interrupted/failed backup from where it was left off, given the .state file\n"+ + "that was generated from the interrupted/failed run.") + flagSet.StringVar(&f.StateFileDst, "state-file-dst", + "", + "Name of a state file that will be saved in backup --directory.\n"+ + "Works only with --file-limit parameter. As we reach --file-limit and close file,\n"+ + "current state will be saved. Works only for default and/or partition backup. \n"+ + "Not work with --parallel-nodes or --node--list.") + flagSet.Int64Var(&f.ScanPageSize, "scan-page-size", + 10000, + "How many records will be read on one iteration for continuation backup.\n"+ + "Affects size if overlap on resuming backup after an error.\n"+ + "Is used only with --state-file-dst or --continue.") return flagSet } diff --git a/cmd/internal/models/backup.go b/cmd/internal/models/backup.go index d3ed6b3e..7d8205b3 100644 --- a/cmd/internal/models/backup.go +++ b/cmd/internal/models/backup.go @@ -34,9 +34,17 @@ type Backup struct { PartitionList string Estimate bool EstimateSamples int64 + StateFileDst string + Continue string + ScanPageSize int64 + OutputFilePrefix string } // ShouldClearTarget check if we should clean target directory. func (b *Backup) ShouldClearTarget() bool { - return b.RemoveFiles || b.RemoveArtifacts + return (b.RemoveFiles || b.RemoveArtifacts) && b.Continue == "" +} + +func (b *Backup) ShouldSaveState() bool { + return b.StateFileDst != "" || b.Continue != "" } diff --git a/config_backup.go b/config_backup.go index cd0c4a24..ec2163a4 100644 --- a/config_backup.go +++ b/config_backup.go @@ -104,6 +104,26 @@ type BackupConfig struct { Compact bool // Only include records that have no ttl set (persistent records). NoTTLOnly bool + // Name of a state file that will be saved in backup directory. + // Works only with FileLimit parameter. + // As we reach FileLimit and close file, the current state will be saved. + // Works only for default and/or partition backup. + // Not work with ParallelNodes or NodeList. + StateFile string + // Resumes an interrupted/failed backup from where it was left off, given the .state file + // that was generated from the interrupted/failed run. + // Works only for default and/or partition backup. Not work with ParallelNodes or NodeList. + Continue bool + // How many records will be read on one iteration for continuation backup. + // Affects size if overlap on resuming backup after an error. + // By default, it must be zero. If any value is set, reading from Aerospike will be paginated. + // Which affects the performance and RAM usage. + PageSize int64 + // If set to true, the same number of workers will be created for each stage of the pipeline. + // Each worker will be connected to the next stage worker with a separate unbuffered channel. + SyncPipelines bool + // When using directory parameter, prepend a prefix to the names of the generated files. + OutputFilePrefix string } // NewDefaultBackupConfig returns a new BackupConfig with default values. @@ -117,6 +137,7 @@ func NewDefaultBackupConfig() *BackupConfig { } } +// isParalleledByNodes checks if backup is parallel by nodes. func (c *BackupConfig) isParalleledByNodes() bool { return c.ParallelNodes || len(c.NodeList) > 0 } @@ -129,11 +150,22 @@ func (c *BackupConfig) isDefaultPartitionFilter() bool { c.PartitionFilters[0].Digest == nil } +// isStateFirstRun checks if it is first run of backup with a state file. +func (c *BackupConfig) isStateFirstRun() bool { + return c.StateFile != "" && !c.Continue +} + +// isStateContinueRun checks if we continue backup from a state file. +func (c *BackupConfig) isStateContinue() bool { + return c.StateFile != "" && c.Continue +} + func (c *BackupConfig) isFullBackup() bool { // full backup doesn't have a lower bound. return c.ModAfter == nil && c.isDefaultPartitionFilter() } +//nolint:gocyclo // validate func is long func with a lot of checks. func (c *BackupConfig) validate() error { if c.ParallelRead < MinParallel || c.ParallelRead > MaxParallel { return fmt.Errorf("parallel read must be between 1 and 1024, got %d", c.ParallelRead) @@ -169,6 +201,22 @@ func (c *BackupConfig) validate() error { return fmt.Errorf("filelimit value must not be negative, got %d", c.FileLimit) } + if c.StateFile != "" && c.PageSize == 0 { + return fmt.Errorf("page size must be set if saving state to state file is enabled") + } + + if c.StateFile != "" && c.FileLimit == 0 { + return fmt.Errorf("file limit must be set if saving state to state file is enabled") + } + + if c.Continue && c.StateFile == "" { + return fmt.Errorf("state file must be set if continue is enabled") + } + + if c.StateFile != "" && !c.SyncPipelines { + return fmt.Errorf("sync pipelines must be enabled if stage file is set") + } + if err := c.CompressionPolicy.validate(); err != nil { return fmt.Errorf("compression policy invalid: %w", err) } diff --git a/policy_compression.go b/config_policy_compression.go similarity index 100% rename from policy_compression.go rename to config_policy_compression.go diff --git a/policy_encryption.go b/config_policy_encryption.go similarity index 100% rename from policy_encryption.go rename to config_policy_encryption.go diff --git a/examples/aws/s3/main.go b/examples/aws/s3/main.go index da2f44d2..aa38b5c3 100644 --- a/examples/aws/s3/main.go +++ b/examples/aws/s3/main.go @@ -97,7 +97,7 @@ func runBackup(ctx context.Context, c *backup.Client) { // set compression policy backupCfg.CompressionPolicy = backup.NewCompressionPolicy(backup.CompressZSTD, 20) - backupHandler, err := c.Backup(ctx, backupCfg, writers) + backupHandler, err := c.Backup(ctx, backupCfg, writers, nil) if err != nil { panic(err) } diff --git a/examples/azure/blob/main.go b/examples/azure/blob/main.go index 8dbe34b5..1f3b2af3 100644 --- a/examples/azure/blob/main.go +++ b/examples/azure/blob/main.go @@ -101,7 +101,7 @@ func runBackup(ctx context.Context, c *backup.Client) { // set compression policy backupCfg.CompressionPolicy = backup.NewCompressionPolicy(backup.CompressZSTD, 20) - backupHandler, err := c.Backup(ctx, backupCfg, writers) + backupHandler, err := c.Backup(ctx, backupCfg, writers, nil) if err != nil { panic(err) } diff --git a/examples/gcp/storage/main.go b/examples/gcp/storage/main.go index 82e431a8..f3bf7fb6 100644 --- a/examples/gcp/storage/main.go +++ b/examples/gcp/storage/main.go @@ -92,7 +92,7 @@ func runBackup(ctx context.Context, c *backup.Client) { // set compression policy backupCfg.CompressionPolicy = backup.NewCompressionPolicy(backup.CompressZSTD, 20) - backupHandler, err := c.Backup(ctx, backupCfg, writers) + backupHandler, err := c.Backup(ctx, backupCfg, writers, nil) if err != nil { panic(err) } diff --git a/examples/readme/main.go b/examples/readme/main.go index 73646be9..1e0d2635 100644 --- a/examples/readme/main.go +++ b/examples/readme/main.go @@ -51,7 +51,7 @@ func main() { backupCfg.Namespace = "test" backupCfg.ParallelRead = 5 - backupHandler, err := backupClient.Backup(ctx, backupCfg, writers) + backupHandler, err := backupClient.Backup(ctx, backupCfg, writers, nil) if err != nil { panic(err) } diff --git a/handler_backup.go b/handler_backup.go index d298ed4b..1b586d53 100644 --- a/handler_backup.go +++ b/handler_backup.go @@ -71,6 +71,8 @@ type BackupHandler struct { id string stats models.BackupStats + // Backup state for continuation. + state *State } // newBackupHandler creates a new BackupHandler. @@ -80,8 +82,9 @@ func newBackupHandler( ac AerospikeClient, logger *slog.Logger, writer Writer, + reader StreamingReader, scanLimiter *semaphore.Weighted, -) *BackupHandler { +) (*BackupHandler, error) { id := uuid.NewString() // For estimates calculations, a writer will be nil. storageType := "" @@ -96,6 +99,20 @@ func newBackupHandler( // redefine context cancel. ctx, cancel := context.WithCancel(ctx) + var ( + state *State + err error + ) + + if config.StateFile != "" { + // Keep in mind, that on continue operation, we update partitions list in config by pointer. + state, err = NewState(ctx, config, reader, writer, logger) + if err != nil { + cancel() + return nil, err + } + } + return &BackupHandler{ ctx: ctx, cancel: cancel, @@ -109,7 +126,8 @@ func newBackupHandler( limiter: limiter, infoClient: asinfo.NewInfoClientFromAerospike(ac, config.InfoPolicy), scanLimiter: scanLimiter, - } + state: state, + }, nil } // run runs the backup job. @@ -167,7 +185,7 @@ func (bh *BackupHandler) getEstimateSamples(ctx context.Context, recordsNumber i scanPolicy.RawCDT = true nodes := bh.aerospikeClient.GetNodes() - handler := newBackupRecordsHandler(bh.config, bh.aerospikeClient, bh.logger, bh.scanLimiter) + handler := newBackupRecordsHandler(bh.config, bh.aerospikeClient, bh.logger, bh.scanLimiter, bh.state) readerConfig := handler.recordReaderConfigForNode(nodes, &scanPolicy) recordReader := aerospike.NewRecordReader(ctx, bh.aerospikeClient, readerConfig, bh.logger) @@ -227,13 +245,21 @@ func (bh *BackupHandler) backupSync(ctx context.Context) error { writeWorkers := bh.makeWriteWorkers(backupWriters) - handler := newBackupRecordsHandler(bh.config, bh.aerospikeClient, bh.logger, bh.scanLimiter) + handler := newBackupRecordsHandler(bh.config, bh.aerospikeClient, bh.logger, bh.scanLimiter, bh.state) bh.stats.TotalRecords, err = handler.countRecords(ctx, bh.infoClient) if err != nil { return err } + if bh.config.isStateContinue() { + // Have to reload filter, as on count records cursor is moving and future scans returns nothing. + bh.config.PartitionFilters, err = bh.state.loadPartitionFilters() + if err != nil { + return err + } + } + return handler.run(ctx, writeWorkers, &bh.stats.ReadRecords) } @@ -243,7 +269,11 @@ func (bh *BackupHandler) makeWriteWorkers( writeWorkers := make([]pipeline.Worker[*models.Token], len(backupWriters)) for i, w := range backupWriters { - var dataWriter pipeline.DataWriter[*models.Token] = newTokenWriter(bh.encoder, w, bh.logger) + var dataWriter pipeline.DataWriter[*models.Token] = newTokenWriter(bh.encoder, w, bh.logger, nil, i) + if bh.state != nil { + dataWriter = newTokenWriter(bh.encoder, w, bh.logger, bh.state.RecordsStateChan, i) + } + dataWriter = newWriterWithTokenStats(dataWriter, &bh.stats, bh.logger) writeWorkers[i] = pipeline.NewWriteWorker(dataWriter, bh.limiter) } @@ -255,7 +285,7 @@ func (bh *BackupHandler) makeWriters(ctx context.Context, n int) ([]io.WriteClos backupWriters := make([]io.WriteCloser, n) for i := 0; i < n; i++ { - writer, err := bh.newWriter(ctx) + writer, err := bh.newWriter(ctx, i) if err != nil { return nil, err } @@ -278,16 +308,26 @@ func closeWriters(backupWriters []io.WriteCloser, logger *slog.Logger) { // If FileLimit is set, it returns a sized writer limited to FileLimit bytes. // The returned writer may be compressed or encrypted depending on the BackupHandler's // configuration. -func (bh *BackupHandler) newWriter(ctx context.Context) (io.WriteCloser, error) { +func (bh *BackupHandler) newWriter(ctx context.Context, n int) (io.WriteCloser, error) { if bh.config.FileLimit > 0 { - return sized.NewWriter(ctx, bh.config.FileLimit, bh.newConfiguredWriter) + // For saving state operation, we init writer with a communication channel. + if bh.config.isStateFirstRun() || bh.config.isStateContinue() { + return sized.NewWriter(ctx, n, bh.state.SaveCommandChan, bh.config.FileLimit, bh.newConfiguredWriter) + } + + return sized.NewWriter(ctx, n, nil, bh.config.FileLimit, bh.newConfiguredWriter) } return bh.newConfiguredWriter(ctx) } func (bh *BackupHandler) newConfiguredWriter(ctx context.Context) (io.WriteCloser, error) { - filename := bh.encoder.GenerateFilename() + suffix := "" + if bh.state != nil { + suffix = bh.state.getFileSuffix() + } + + filename := bh.encoder.GenerateFilename(bh.config.OutputFilePrefix, suffix) storageWriter, err := bh.writer.NewWriter(ctx, filename) if err != nil { @@ -411,14 +451,30 @@ func (bh *BackupHandler) backupSIndexes( reader := aerospike.NewSIndexReader(bh.infoClient, bh.config.Namespace, bh.logger) sindexReadWorker := pipeline.NewReadWorker[*models.Token](reader) - sindexWriter := pipeline.DataWriter[*models.Token](newTokenWriter(bh.encoder, writer, bh.logger)) + sindexWriter := pipeline.DataWriter[*models.Token](newTokenWriter(bh.encoder, writer, bh.logger, nil, -1)) + if bh.state != nil { + sindexWriter = pipeline.DataWriter[*models.Token]( + newTokenWriter( + bh.encoder, + writer, + bh.logger, + bh.state.RecordsStateChan, + -1, + ), + ) + } + sindexWriter = newWriterWithTokenStats(sindexWriter, &bh.stats, bh.logger) sindexWriteWorker := pipeline.NewWriteWorker(sindexWriter, bh.limiter) - sindexPipeline := pipeline.NewPipeline[*models.Token]( + sindexPipeline, err := pipeline.NewPipeline[*models.Token]( + bh.config.SyncPipelines, []pipeline.Worker[*models.Token]{sindexReadWorker}, []pipeline.Worker[*models.Token]{sindexWriteWorker}, ) + if err != nil { + return err + } return sindexPipeline.Run(ctx) } @@ -430,14 +486,31 @@ func (bh *BackupHandler) backupUDFs( reader := aerospike.NewUDFReader(bh.infoClient, bh.logger) udfReadWorker := pipeline.NewReadWorker[*models.Token](reader) - udfWriter := pipeline.DataWriter[*models.Token](newTokenWriter(bh.encoder, writer, bh.logger)) + udfWriter := pipeline.DataWriter[*models.Token](newTokenWriter(bh.encoder, writer, bh.logger, nil, -1)) + + if bh.state != nil { + udfWriter = pipeline.DataWriter[*models.Token]( + newTokenWriter( + bh.encoder, + writer, + bh.logger, + bh.state.RecordsStateChan, + -1, + ), + ) + } + udfWriter = newWriterWithTokenStats(udfWriter, &bh.stats, bh.logger) udfWriteWorker := pipeline.NewWriteWorker(udfWriter, bh.limiter) - udfPipeline := pipeline.NewPipeline[*models.Token]( + udfPipeline, err := pipeline.NewPipeline[*models.Token]( + bh.config.SyncPipelines, []pipeline.Worker[*models.Token]{udfReadWorker}, []pipeline.Worker[*models.Token]{udfWriteWorker}, ) + if err != nil { + return err + } return udfPipeline.Run(ctx) } diff --git a/handler_backup_records.go b/handler_backup_records.go index c33ba35e..e7d10fa9 100644 --- a/handler_backup_records.go +++ b/handler_backup_records.go @@ -37,8 +37,7 @@ type backupRecordsHandler struct { aerospikeClient AerospikeClient logger *slog.Logger scanLimiter *semaphore.Weighted - // is used when AfterDigest is set. - afterDigest []byte + state *State } func newBackupRecordsHandler( @@ -46,6 +45,7 @@ func newBackupRecordsHandler( ac AerospikeClient, logger *slog.Logger, scanLimiter *semaphore.Weighted, + state *State, ) *backupRecordsHandler { logger.Debug("created new backup records handler") @@ -54,6 +54,7 @@ func newBackupRecordsHandler( aerospikeClient: ac, logger: logger, scanLimiter: scanLimiter, + state: state, } return h @@ -74,9 +75,14 @@ func (bh *backupRecordsHandler) run( processors.NewVoidTimeSetter(bh.logger), processors.NewTPSLimiter[*models.Token]( ctx, bh.config.RecordsPerSecond), - )) + ), bh.config.ParallelRead) - return pipeline.NewPipeline(readWorkers, composeProcessor, writers).Run(ctx) + pl, err := pipeline.NewPipeline(bh.config.SyncPipelines, readWorkers, composeProcessor, writers) + if err != nil { + return err + } + + return pl.Run(ctx) } func (bh *backupRecordsHandler) countRecords(ctx context.Context, infoClient *asinfo.InfoClient) (uint64, error) { @@ -203,24 +209,30 @@ func (bh *backupRecordsHandler) makeAerospikeReadWorkers( func (bh *backupRecordsHandler) makeAerospikeReadWorkersForPartition( ctx context.Context, n int, scanPolicy *a.ScanPolicy, ) ([]pipeline.Worker[*models.Token], error) { - partitionGroups, err := splitPartitions(bh.config.PartitionFilters, n) - if err != nil { - return nil, err + var err error + + partitionGroups := bh.config.PartitionFilters + + if !bh.config.isStateContinue() { + partitionGroups, err = splitPartitions(bh.config.PartitionFilters, n) + if err != nil { + return nil, err + } + + if bh.config.isStateFirstRun() { + // Init state. + if err := bh.state.InitState(partitionGroups); err != nil { + return nil, err + } + } } // If we have multiply partition filters, we shrink workers to number of filters. - n = len(partitionGroups) - - readWorkers := make([]pipeline.Worker[*models.Token], n) + readWorkers := make([]pipeline.Worker[*models.Token], len(partitionGroups)) - for i := 0; i < n; i++ { + for i := range partitionGroups { recordReaderConfig := bh.recordReaderConfigForPartitions(partitionGroups[i], scanPolicy) - // For the first partition in the list, we start from digest if it is set. - if bh.afterDigest != nil && i == 0 { - recordReaderConfig = bh.recordReaderConfigForPartitions(partitionGroups[i], scanPolicy) - } - recordReader := aerospike.NewRecordReader( ctx, bh.aerospikeClient, @@ -278,10 +290,12 @@ func (bh *backupRecordsHandler) recordReaderConfigForPartitions( partitionFilter *a.PartitionFilter, scanPolicy *a.ScanPolicy, ) *aerospike.RecordReaderConfig { + pfCopy := *partitionFilter + return aerospike.NewRecordReaderConfig( bh.config.Namespace, bh.config.SetList, - partitionFilter, + &pfCopy, nil, scanPolicy, bh.config.BinList, @@ -291,6 +305,7 @@ func (bh *backupRecordsHandler) recordReaderConfigForPartitions( }, bh.scanLimiter, bh.config.NoTTLOnly, + bh.config.PageSize, ) } @@ -311,5 +326,6 @@ func (bh *backupRecordsHandler) recordReaderConfigForNode( }, bh.scanLimiter, bh.config.NoTTLOnly, + bh.config.PageSize, ) } diff --git a/handler_restore.go b/handler_restore.go index 0ed2ee29..b5ca0375 100644 --- a/handler_restore.go +++ b/handler_restore.go @@ -39,6 +39,11 @@ type StreamingReader interface { // Must be run in a goroutine `go rh.reader.StreamFiles(ctx, readersCh, errorsCh)`. StreamFiles(context.Context, chan<- io.ReadCloser, chan<- error) + // StreamFile creates a single file reader and sends io.Readers to the `readersCh` + // In case of an error, it is sent to the `errorsCh` channel. + // Must be run in a goroutine `go rh.reader.StreamFile()`. + StreamFile(ctx context.Context, filename string, readersCh chan<- io.ReadCloser, errorsCh chan<- error) + // GetType returns the type of storage. Used in logging. GetType() string } @@ -224,9 +229,14 @@ func (rh *RestoreHandler) runRestorePipeline(ctx context.Context, readers []pipe processors.NewChangeNamespace(nsSource, nsDest), processors.NewExpirationSetter(&rh.stats.RecordsExpired, rh.config.ExtraTTL, rh.logger), processors.NewTPSLimiter[*models.Token](ctx, rh.config.RecordsPerSecond), - )) + ), rh.config.Parallel) + + pl, err := pipeline.NewPipeline(false, readers, composeProcessor, writeWorkers) + if err != nil { + return err + } - return pipeline.NewPipeline(readers, composeProcessor, writeWorkers).Run(ctx) + return pl.Run(ctx) } func (rh *RestoreHandler) useBatchWrites() (bool, error) { @@ -239,7 +249,16 @@ func (rh *RestoreHandler) useBatchWrites() (bool, error) { return infoClient.SupportsBatchWrite() } -func newTokenWorker(processor processors.TokenProcessor) []pipeline.Worker[*models.Token] { +func newTokenWorker(processor processors.TokenProcessor, parallel int) []pipeline.Worker[*models.Token] { + if parallel > 0 { + workers := make([]pipeline.Worker[*models.Token], 0, parallel) + for i := 0; i < parallel; i++ { + workers = append(workers, pipeline.NewProcessorWorker(processor)) + } + + return workers + } + return []pipeline.Worker[*models.Token]{ pipeline.NewProcessorWorker(processor), } diff --git a/internal/processors/change_namespace_test.go b/internal/processors/change_namespace_test.go index 7c5aa8e4..cb250a61 100644 --- a/internal/processors/change_namespace_test.go +++ b/internal/processors/change_namespace_test.go @@ -45,7 +45,7 @@ func TestChangeNamespaceProcessor(t *testing.T) { Record: &aerospike.Record{ Key: key, }, - }, 0), + }, 0, nil), wantErr: false, }, { @@ -56,7 +56,7 @@ func TestChangeNamespaceProcessor(t *testing.T) { Record: &aerospike.Record{ Key: key, }, - }, 0), + }, 0, nil), wantErr: false, }, { @@ -74,7 +74,7 @@ func TestChangeNamespaceProcessor(t *testing.T) { Record: &aerospike.Record{ Key: invalidKey, }, - }, 0), + }, 0, nil), wantErr: true, }, { @@ -85,7 +85,7 @@ func TestChangeNamespaceProcessor(t *testing.T) { Record: &aerospike.Record{ Key: key, }, - }, 0), + }, 0, nil), wantErr: false, }, } diff --git a/io/aerospike/record_reader.go b/io/aerospike/record_reader.go index 3cb50b53..72c41d74 100644 --- a/io/aerospike/record_reader.go +++ b/io/aerospike/record_reader.go @@ -39,6 +39,10 @@ type RecordReaderConfig struct { setList []string binList []string noTTLOnly bool + + // pageSize used for paginated scan for saving reading state. + // If pageSize = 0, we think that we use normal scan. + pageSize int64 } // NewRecordReaderConfig creates a new RecordReaderConfig. @@ -51,6 +55,7 @@ func NewRecordReaderConfig(namespace string, timeBounds models.TimeBounds, scanLimiter *semaphore.Weighted, noTTLOnly bool, + pageSize int64, ) *RecordReaderConfig { return &RecordReaderConfig{ namespace: namespace, @@ -62,6 +67,7 @@ func NewRecordReaderConfig(namespace string, timeBounds: timeBounds, scanLimiter: scanLimiter, noTTLOnly: noTTLOnly, + pageSize: pageSize, } } @@ -95,6 +101,8 @@ type RecordReader struct { logger *slog.Logger config *RecordReaderConfig scanResult *recordSets // initialized on first Read() call + // pageRecordsChan chan is initialized only if pageSize > 0. + pageRecordsChan chan *pageRecord } // NewRecordReader creates a new RecordReader. @@ -108,16 +116,27 @@ func NewRecordReader( logger = logging.WithReader(logger, id, logging.ReaderTypeRecord) logger.Debug("created new aerospike record reader") - return &RecordReader{ + r := &RecordReader{ ctx: ctx, config: cfg, client: client, logger: logger, } + + return r } // Read reads the next record from the Aerospike database. func (r *RecordReader) Read() (*models.Token, error) { + // If pageSize is set, we use paginated read. + if r.config.pageSize > 0 { + return r.readPage() + } + + return r.read() +} + +func (r *RecordReader) read() (*models.Token, error) { if !r.isScanStarted() { scan, err := r.startScan() if err != nil { @@ -141,7 +160,8 @@ func (r *RecordReader) Read() (*models.Token, error) { rec := models.Record{ Record: res.Record, } - recToken := models.NewRecordToken(&rec, 0) + + recToken := models.NewRecordToken(&rec, 0, nil) return recToken, nil } diff --git a/io/aerospike/record_reader_paginated.go b/io/aerospike/record_reader_paginated.go new file mode 100644 index 00000000..a4d60963 --- /dev/null +++ b/io/aerospike/record_reader_paginated.go @@ -0,0 +1,192 @@ +// Copyright 2024 Aerospike, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package aerospike + +import ( + "fmt" + "io" + + a "github.com/aerospike/aerospike-client-go/v7" + "github.com/aerospike/backup-go/models" +) + +// pageRecord contains records and serialized filter. +type pageRecord struct { + result *a.Result + filter *models.PartitionFilterSerialized +} + +func newPageRecord(result *a.Result, filter *models.PartitionFilterSerialized) *pageRecord { + return &pageRecord{ + result: result, + filter: filter, + } +} + +// readPage reads the next record from pageRecord from the Aerospike database. +func (r *RecordReader) readPage() (*models.Token, error) { + errChan := make(chan error) + + if r.pageRecordsChan == nil { + r.pageRecordsChan = make(chan *pageRecord) + go r.startScanPaginated(errChan) + } + + select { + case err := <-errChan: + if err != nil { + return nil, err + } + case res, active := <-r.pageRecordsChan: + if !active { + r.logger.Debug("scan finished") + return nil, io.EOF + } + + if res.result == nil { + return nil, io.EOF + } + + if res.result.Err != nil { + r.logger.Error("error reading record", "error", res.result.Err) + return nil, res.result.Err + } + + rec := models.Record{ + Record: res.result.Record, + } + + recToken := models.NewRecordToken(&rec, 0, res.filter) + + return recToken, nil + } + + return nil, nil +} + +// startScanPaginated starts the scan for the RecordReader only for state save! +func (r *RecordReader) startScanPaginated(localErrChan chan error) { + scanPolicy := *r.config.scanPolicy + scanPolicy.FilterExpression = getScanExpression(r.config.timeBounds, r.config.noTTLOnly) + + setsToScan := r.config.setList + if len(setsToScan) == 0 { + setsToScan = []string{""} + } + + if r.config.scanLimiter != nil { + err := r.config.scanLimiter.Acquire(r.ctx, int64(len(setsToScan))) + if err != nil { + localErrChan <- err + return + } + } + + for _, set := range setsToScan { + resultChan, errChan := r.streamPartitionPages( + &scanPolicy, + set, + ) + + for { + select { + case err, ok := <-errChan: + if !ok { + break + } + + if err != nil { + localErrChan <- err + return + } + case result, ok := <-resultChan: + if !ok { + // After we finish all the readings, we close pageRecord chan. + close(r.pageRecordsChan) + close(localErrChan) + + return + } + + for i := range result { + r.pageRecordsChan <- result[i] + } + } + } + } +} + +// streamPartitionPages reads the whole pageRecord and send it to resultChan. +func (r *RecordReader) streamPartitionPages( + scanPolicy *a.ScanPolicy, + set string, +) (resultChan chan []*pageRecord, errChan chan error) { + scanPolicy.MaxRecords = r.config.pageSize + // resultChan must not be buffered, we send the whole pageRecord to resultChan. + // So if we make it buffered, we will consume a lot of RAM. + resultChan = make(chan []*pageRecord) + errChan = make(chan error) + + go func() { + // For one iteration, we scan 1 pageRecord. + for { + curFilter, err := models.NewPartitionFilterSerialized(r.config.partitionFilter) + if err != nil { + errChan <- fmt.Errorf("failed to serialize partition filter: %w", err) + } + + recSet, aErr := r.client.ScanPartitions( + scanPolicy, + r.config.partitionFilter, + r.config.namespace, + set, + r.config.binList..., + ) + if aErr != nil { + errChan <- fmt.Errorf("failed to scan sets: %w", aErr.Unwrap()) + } + + // result contains []*a.Result and serialized filter models.PartitionFilterSerialized + result := make([]*pageRecord, 0, r.config.pageSize) + + // to count records on pageRecord. + var counter int64 + for res := range recSet.Results() { + counter++ + + if res.Err != nil { + continue + } + // Save to pageRecord filter that returns current pageRecord. + result = append(result, newPageRecord(res, &curFilter)) + } + + if aErr = recSet.Close(); aErr != nil { + errChan <- fmt.Errorf("failed to close record set: %w", aErr.Unwrap()) + } + + resultChan <- result + // If there were no records on the pageRecord, we think that it was last pageRecord and exit. + if counter == 0 { + close(resultChan) + close(errChan) + + return + } + } + }() + + return resultChan, errChan +} diff --git a/io/aerospike/record_reader_test.go b/io/aerospike/record_reader_test.go index 56496309..90f51513 100644 --- a/io/aerospike/record_reader_test.go +++ b/io/aerospike/record_reader_test.go @@ -85,7 +85,7 @@ func (suite *readersTestSuite) TestAerospikeRecordReader() { v, err := reader.Read() suite.Nil(err) - expectedRecToken := models.NewRecordToken(mockRec, 0) + expectedRecToken := models.NewRecordToken(mockRec, 0, nil) suite.Equal(expectedRecToken, v) mockScanner.AssertExpectations(suite.T()) } @@ -273,7 +273,7 @@ func (suite *readersTestSuite) TestAerospikeRecordReaderWithPolicy() { v, err := reader.Read() suite.Nil(err) - expectedRecToken := models.NewRecordToken(mockRec, 0) + expectedRecToken := models.NewRecordToken(mockRec, 0, nil) suite.Equal(expectedRecToken, v) mockScanner.AssertExpectations(suite.T()) } diff --git a/io/aerospike/result_sets.go b/io/aerospike/record_sets.go similarity index 100% rename from io/aerospike/result_sets.go rename to io/aerospike/record_sets.go diff --git a/io/aws/s3/options.go b/io/aws/s3/options.go index bd956914..153e3146 100644 --- a/io/aws/s3/options.go +++ b/io/aws/s3/options.go @@ -30,6 +30,8 @@ type options struct { // startAfter is where you want Amazon S3 to start listing from. Amazon S3 starts // listing after this specified key. StartAfter can be any key in the bucket. startAfter string + // skipDirCheck if true, backup directory won't be checked. + skipDirCheck bool } type Opt func(*options) @@ -80,3 +82,11 @@ func WithStartAfter(v string) Opt { r.startAfter = v } } + +// WithSkipDirCheck adds skip dir check flags. +// Which means that backup directory won't be checked for emptiness. +func WithSkipDirCheck() Opt { + return func(r *options) { + r.skipDirCheck = true + } +} diff --git a/io/aws/s3/reader.go b/io/aws/s3/reader.go index 58201370..3b0ac602 100644 --- a/io/aws/s3/reader.go +++ b/io/aws/s3/reader.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "io" + "path/filepath" "strings" "github.com/aws/aws-sdk-go-v2/aws" @@ -76,7 +77,7 @@ func NewReader( if _, err := client.HeadBucket(ctx, &s3.HeadBucketInput{ Bucket: aws.String(bucketName), }); err != nil { - return nil, fmt.Errorf("bucket does not exist or you don't have access: %w", err) + return nil, fmt.Errorf("bucket %s does not exist or you don't have access: %w", bucketName, err) } // S3 storage can read/write to "/" prefix, so we should replace it with "". @@ -110,7 +111,7 @@ func (r *Reader) StreamFiles( } // If not a folder, only file. - r.streamFile(ctx, r.path, readersCh, errorsCh) + r.StreamFile(ctx, r.path, readersCh, errorsCh) } func (r *Reader) streamDirectory( @@ -169,12 +170,16 @@ func (r *Reader) streamDirectory( } } -// streamFile opens single file from s3 and sends io.Readers to the `readersCh` +// StreamFile opens single file from s3 and sends io.Readers to the `readersCh` // In case of an error, it is sent to the `errorsCh` channel. -func (r *Reader) streamFile( +func (r *Reader) StreamFile( ctx context.Context, filename string, readersCh chan<- io.ReadCloser, errorsCh chan<- error) { defer close(readersCh) + if r.isDir { + filename = filepath.Join(r.path, filename) + } + object, err := r.client.GetObject(ctx, &s3.GetObjectInput{ Bucket: &r.bucketName, Key: &filename, diff --git a/io/aws/s3/writer.go b/io/aws/s3/writer.go index f14069fd..0d4bc16b 100644 --- a/io/aws/s3/writer.go +++ b/io/aws/s3/writer.go @@ -87,10 +87,10 @@ func NewWriter( Bucket: aws.String(bucketName), }) if err != nil { - return nil, fmt.Errorf("bucket does not exist or you don't have access: %w", err) + return nil, fmt.Errorf("bucket %s does not exist or you don't have access: %w", bucketName, err) } - if w.isDir { + if w.isDir && !w.skipDirCheck { // Check if backup dir is empty. isEmpty, err := isEmptyDirectory(ctx, client, bucketName, w.prefix) if err != nil { diff --git a/io/azure/blob/options.go b/io/azure/blob/options.go index ae7bcca3..67fa8cc6 100644 --- a/io/azure/blob/options.go +++ b/io/azure/blob/options.go @@ -38,6 +38,8 @@ type options struct { // as the value for the marker parameter in a subsequent call to request the next // page of list items. The marker value is opaque to the client. marker string + // skipDirCheck if true, backup directory won't be checked. + skipDirCheck bool } type Opt func(*options) @@ -98,3 +100,11 @@ func WithMarker(v string) Opt { r.marker = v } } + +// WithSkipDirCheck adds skip dir check flags. +// Which means that backup directory won't be checked for emptiness. +func WithSkipDirCheck() Opt { + return func(r *options) { + r.skipDirCheck = true + } +} diff --git a/io/azure/blob/reader.go b/io/azure/blob/reader.go index df326e74..92177cd1 100644 --- a/io/azure/blob/reader.go +++ b/io/azure/blob/reader.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "io" + "path/filepath" "strings" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" @@ -99,7 +100,7 @@ func (r *Reader) StreamFiles( } // If not a folder, only file. - r.streamFile(ctx, r.path, readersCh, errorsCh) + r.StreamFile(ctx, r.path, readersCh, errorsCh) } func (r *Reader) streamDirectory( @@ -146,12 +147,16 @@ func (r *Reader) streamDirectory( } } -// streamFile opens a single file from GCP cloud storage and sends io.Readers to the `readersCh` +// StreamFile opens a single file from GCP cloud storage and sends io.Readers to the `readersCh` // In case of an error, it is sent to the `errorsCh` channel. -func (r *Reader) streamFile( +func (r *Reader) StreamFile( ctx context.Context, filename string, readersCh chan<- io.ReadCloser, errorsCh chan<- error) { defer close(readersCh) + if r.isDir { + filename = filepath.Join(r.path, filename) + } + resp, err := r.client.DownloadStream(ctx, r.containerName, filename, nil) if err != nil { errorsCh <- fmt.Errorf("failed to create reader from file %s: %w", filename, err) diff --git a/io/azure/blob/writer.go b/io/azure/blob/writer.go index 15f6421d..3976464a 100644 --- a/io/azure/blob/writer.go +++ b/io/azure/blob/writer.go @@ -80,7 +80,7 @@ func NewWriter( return nil, fmt.Errorf("unable to get container properties: %w", err) } - if w.isDir { + if w.isDir && !w.skipDirCheck { // Check if backup dir is empty. isEmpty, err := isEmptyDirectory(ctx, client, containerName, prefix) if err != nil { diff --git a/io/encoding/asb/decode.go b/io/encoding/asb/decode.go index 4c024cf1..af8a2c5e 100644 --- a/io/encoding/asb/decode.go +++ b/io/encoding/asb/decode.go @@ -161,7 +161,7 @@ func (r *Decoder) NextToken() (*models.Token, error) { case *models.UDF: return models.NewUDFToken(v, size), nil case *models.Record: - return models.NewRecordToken(v, size), nil + return models.NewRecordToken(v, size, nil), nil default: return nil, fmt.Errorf("unsupported token type %T", v) } diff --git a/io/encoding/asb/decode_test.go b/io/encoding/asb/decode_test.go index e0e397e3..3f07da67 100644 --- a/io/encoding/asb/decode_test.go +++ b/io/encoding/asb/decode_test.go @@ -3718,7 +3718,7 @@ func TestASBReader_NextToken(t *testing.T) { Generation: 10, }, VoidTime: 10, - }, 106), + }, 106, nil), }, { name: "negative EOF", diff --git a/io/encoding/asb/encode.go b/io/encoding/asb/encode.go index 2b99842e..f4dbcab0 100644 --- a/io/encoding/asb/encode.go +++ b/io/encoding/asb/encode.go @@ -48,8 +48,8 @@ func NewEncoder(namespace string, compact bool) *Encoder { } // GenerateFilename generates a file name for the given namespace. -func (e *Encoder) GenerateFilename() string { - return fmt.Sprintf("%s_%d.asb", e.namespace, e.id.Add(1)) +func (e *Encoder) GenerateFilename(prefix, suffix string) string { + return fmt.Sprintf("%s%s_%d%s.asb", prefix, e.namespace, e.id.Add(1), suffix) } // EncodeToken encodes a token to the ASB format. diff --git a/io/gcp/storage/options.go b/io/gcp/storage/options.go index f55d71e2..b6b54c87 100644 --- a/io/gcp/storage/options.go +++ b/io/gcp/storage/options.go @@ -30,6 +30,8 @@ type options struct { // startOffset is used to filter results to objects whose names are // lexicographically equal to or after startOffset. startOffset string + // skipDirCheck if true, backup directory won't be checked. + skipDirCheck bool } type Opt func(*options) @@ -82,3 +84,11 @@ func WithStartOffset(v string) Opt { r.startOffset = v } } + +// WithSkipDirCheck adds skip dir check flags. +// Which means that backup directory won't be checked for emptiness. +func WithSkipDirCheck() Opt { + return func(r *options) { + r.skipDirCheck = true + } +} diff --git a/io/gcp/storage/reader.go b/io/gcp/storage/reader.go index b3ad85c3..1e9d4573 100644 --- a/io/gcp/storage/reader.go +++ b/io/gcp/storage/reader.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "io" + "path/filepath" "strings" "cloud.google.com/go/storage" @@ -103,7 +104,7 @@ func (r *Reader) StreamFiles( } // If not a folder, only file. - r.streamFile(ctx, r.path, readersCh, errorsCh) + r.StreamFile(ctx, r.path, readersCh, errorsCh) } func (r *Reader) streamDirectory( @@ -159,12 +160,16 @@ func (r *Reader) streamDirectory( } } -// streamFile opens a single file from GCP cloud storage and sends io.Readers to the `readersCh` +// StreamFile opens a single file from GCP cloud storage and sends io.Readers to the `readersCh` // In case of an error, it is sent to the `errorsCh` channel. -func (r *Reader) streamFile( +func (r *Reader) StreamFile( ctx context.Context, filename string, readersCh chan<- io.ReadCloser, errorsCh chan<- error) { defer close(readersCh) + if r.isDir { + filename = filepath.Join(r.path, filename) + } + reader, err := r.bucketHandle.Object(filename).NewReader(ctx) if err != nil { errorsCh <- fmt.Errorf("failed to open %s: %w", filename, err) diff --git a/io/gcp/storage/writer.go b/io/gcp/storage/writer.go index 776d1168..27cc19d2 100644 --- a/io/gcp/storage/writer.go +++ b/io/gcp/storage/writer.go @@ -80,7 +80,7 @@ func NewWriter( return nil, fmt.Errorf("failed to get bucketHandler %s attr: %w", bucketName, err) } - if w.isDir { + if w.isDir && !w.skipDirCheck { // Check if backup dir is empty. isEmpty, err := isEmptyDirectory(ctx, bucketHandler, prefix) if err != nil { diff --git a/io/local/options.go b/io/local/options.go new file mode 100644 index 00000000..834c9be6 --- /dev/null +++ b/io/local/options.go @@ -0,0 +1,81 @@ +// Copyright 2024 Aerospike, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package local + +type options struct { + // path contains path to file or directory. + path string + // isDir flag describes what we have in path, file or directory. + isDir bool + // isRemovingFiles flag describes should we remove everything from backup folder or not. + isRemovingFiles bool + // validator contains files validator that is applied to files if isDir = true. + validator validator + // withNestedDir describes if we should check for if an object is a directory for read/write operations. + // When we stream files or delete files in folder, we skip directories. This flag will avoid skipping. + // Default: false + withNestedDir bool + // skipDirCheck if true, backup directory won't be checked. + skipDirCheck bool +} + +type Opt func(*options) + +// WithDir adds directory to reading/writing files from/to. +func WithDir(path string) Opt { + return func(r *options) { + r.path = path + r.isDir = true + } +} + +// WithFile adds a file path to reading/writing from/to. +func WithFile(path string) Opt { + return func(r *options) { + r.path = path + r.isDir = false + } +} + +// WithValidator adds validator to Reader, so files will be validated before reading. +// Is used only for Reader. +func WithValidator(v validator) Opt { + return func(r *options) { + r.validator = v + } +} + +// WithNestedDir adds withNestedDir = true parameter. That means that we won't skip nested folders. +func WithNestedDir() Opt { + return func(r *options) { + r.withNestedDir = true + } +} + +// WithRemoveFiles adds remove files flag, so all files will be removed from backup folder before backup. +// Is used only for Writer. +func WithRemoveFiles() Opt { + return func(r *options) { + r.isRemovingFiles = true + } +} + +// WithSkipDirCheck adds skip dir check flags. +// Which means that backup directory won't be checked for emptiness. +func WithSkipDirCheck() Opt { + return func(r *options) { + r.skipDirCheck = true + } +} diff --git a/io/local/reader.go b/io/local/reader.go index 9ed53e7a..cd6407bc 100644 --- a/io/local/reader.go +++ b/io/local/reader.go @@ -38,54 +38,6 @@ type Reader struct { options } -type options struct { - // path contains path to file or directory. - path string - // isDir flag describes what we have in path, file or directory. - isDir bool - // isRemovingFiles flag describes should we remove everything from backup folder or not. - isRemovingFiles bool - // validator contains files validator that is applied to files if isDir = true. - validator validator - // withNestedDir describes if we should check for if an object is a directory for read/write operations. - // When we stream files or delete files in folder, we skip directories. This flag will avoid skipping. - // Default: false - withNestedDir bool -} - -type Opt func(*options) - -// WithDir adds directory to reading/writing files from/to. -func WithDir(path string) Opt { - return func(r *options) { - r.path = path - r.isDir = true - } -} - -// WithFile adds a file path to reading/writing from/to. -func WithFile(path string) Opt { - return func(r *options) { - r.path = path - r.isDir = false - } -} - -// WithValidator adds validator to Reader, so files will be validated before reading. -// Is used only for Reader. -func WithValidator(v validator) Opt { - return func(r *options) { - r.validator = v - } -} - -// WithNestedDir adds withNestedDir = true parameter. That means that we won't skip nested folders. -func WithNestedDir() Opt { - return func(r *options) { - r.withNestedDir = true - } -} - // NewReader creates a new local directory/file Reader. // Must be called with WithDir(path string) or WithFile(path string) - mandatory. // Can be called with WithValidator(v validator) - optional. @@ -124,7 +76,7 @@ func (r *Reader) StreamFiles( } // If not a folder, only file. - r.streamFile(ctx, r.path, readersCh, errorsCh) + r.StreamFile(ctx, r.path, readersCh, errorsCh) } func (r *Reader) streamDirectory( @@ -181,15 +133,19 @@ func (r *Reader) streamDirectory( } } -// streamFile opens single file and sends io.Readers to the `readersCh` +// StreamFile opens single file and sends io.Readers to the `readersCh` // In case of an error, it is sent to the `errorsCh` channel. -func (r *Reader) streamFile( +func (r *Reader) StreamFile( ctx context.Context, filename string, readersCh chan<- io.ReadCloser, errorsCh chan<- error) { if ctx.Err() != nil { errorsCh <- ctx.Err() return } + if r.isDir { + filename = filepath.Join(r.path, filename) + } + reader, err := os.Open(filename) if err != nil { errorsCh <- fmt.Errorf("failed to open %s: %w", filename, err) diff --git a/io/local/writer.go b/io/local/writer.go index 79ea37e3..6c1f3250 100644 --- a/io/local/writer.go +++ b/io/local/writer.go @@ -34,14 +34,6 @@ type Writer struct { called atomic.Bool } -// WithRemoveFiles adds remove files flag, so all files will be removed from backup folder before backup. -// Is used only for Writer. -func WithRemoveFiles() Opt { - return func(r *options) { - r.isRemovingFiles = true - } -} - // NewWriter creates a new writer for local directory/file writes. // Must be called with WithDir(path string) or WithFile(path string) - mandatory. // Can be called with WithRemoveFiles() - optional. @@ -61,7 +53,7 @@ func NewWriter(ctx context.Context, opts ...Opt) (*Writer, error) { return nil, fmt.Errorf("failed to prepare backup directory: %w", err) } - if w.isDir { + if w.isDir && !w.skipDirCheck { // Check if backup dir is empty. isEmpty, err := isEmptyDirectory(w.path) if err != nil { @@ -194,6 +186,7 @@ func (w *Writer) NewWriter(ctx context.Context, fileName string) (io.WriteCloser if ctx.Err() != nil { return nil, ctx.Err() } + // protection for single file backup. if !w.isDir { if !w.called.CompareAndSwap(false, true) { diff --git a/io/sized/writer.go b/io/sized/writer.go index be4d9bee..0ec7e27c 100644 --- a/io/sized/writer.go +++ b/io/sized/writer.go @@ -29,20 +29,25 @@ type Writer struct { open func(context.Context) (io.WriteCloser, error) size int64 limit int64 + // Number of writer, for saving state. + n int + saveCommandChan chan int } // NewWriter creates a new Writer writer with a size limit. // limit must be greater than 0. -func NewWriter(ctx context.Context, limit int64, +func NewWriter(ctx context.Context, n int, saveCommandChan chan int, limit int64, open func(context.Context) (io.WriteCloser, error)) (*Writer, error) { if limit <= 0 { return nil, fmt.Errorf("limit must be greater than 0, got %d", limit) } return &Writer{ - ctx: ctx, - limit: limit, - open: open, + ctx: ctx, + limit: limit, + open: open, + n: n, + saveCommandChan: saveCommandChan, }, nil } @@ -53,6 +58,10 @@ func (f *Writer) Write(p []byte) (n int, err error) { return 0, fmt.Errorf("failed to close writer: %w", err) } + if f.saveCommandChan != nil { + f.saveCommandChan <- f.n + } + f.size = 0 f.writer = nil } diff --git a/io/sized/writer_test.go b/io/sized/writer_test.go index f85c1098..e498a1bf 100644 --- a/io/sized/writer_test.go +++ b/io/sized/writer_test.go @@ -61,7 +61,7 @@ func (suite *sizedTestSuite) Test_writeCloserSized() { return writer2, nil } - wcs, err := NewWriter(context.Background(), 10, open) + wcs, err := NewWriter(context.Background(), 1, nil, 10, open) suite.NotNil(wcs) suite.Nil(err) @@ -111,6 +111,6 @@ func (suite *sizedTestSuite) Test_writeCloserSized_ErrLimit() { return writer2, nil } - _, err := NewWriter(context.Background(), -1, open) + _, err := NewWriter(context.Background(), 1, nil, -1, open) require.ErrorContains(suite.T(), err, "limit must be greater than 0") } diff --git a/io_encoding.go b/io_encoding.go index b2a0e573..fff19872 100644 --- a/io_encoding.go +++ b/io_encoding.go @@ -36,7 +36,7 @@ const ( type Encoder interface { EncodeToken(*models.Token) ([]byte, error) GetHeader() []byte - GenerateFilename() string + GenerateFilename(prefix, suffix string) string } // NewEncoder returns a new Encoder according to `EncoderType`. diff --git a/mocks/Encoder_mock.go b/mocks/Encoder_mock.go index 8f352f7a..6c4fa22d 100644 --- a/mocks/Encoder_mock.go +++ b/mocks/Encoder_mock.go @@ -78,17 +78,17 @@ func (_c *MockEncoder_EncodeToken_Call) RunAndReturn(run func(*models.Token) ([] return _c } -// GenerateFilename provides a mock function with given fields: -func (_m *MockEncoder) GenerateFilename() string { - ret := _m.Called() +// GenerateFilename provides a mock function with given fields: prefix, suffix +func (_m *MockEncoder) GenerateFilename(prefix string, suffix string) string { + ret := _m.Called(prefix, suffix) if len(ret) == 0 { panic("no return value specified for GenerateFilename") } var r0 string - if rf, ok := ret.Get(0).(func() string); ok { - r0 = rf() + if rf, ok := ret.Get(0).(func(string, string) string); ok { + r0 = rf(prefix, suffix) } else { r0 = ret.Get(0).(string) } @@ -102,13 +102,15 @@ type MockEncoder_GenerateFilename_Call struct { } // GenerateFilename is a helper method to define mock.On call -func (_e *MockEncoder_Expecter) GenerateFilename() *MockEncoder_GenerateFilename_Call { - return &MockEncoder_GenerateFilename_Call{Call: _e.mock.On("GenerateFilename")} +// - prefix string +// - suffix string +func (_e *MockEncoder_Expecter) GenerateFilename(prefix interface{}, suffix interface{}) *MockEncoder_GenerateFilename_Call { + return &MockEncoder_GenerateFilename_Call{Call: _e.mock.On("GenerateFilename", prefix, suffix)} } -func (_c *MockEncoder_GenerateFilename_Call) Run(run func()) *MockEncoder_GenerateFilename_Call { +func (_c *MockEncoder_GenerateFilename_Call) Run(run func(prefix string, suffix string)) *MockEncoder_GenerateFilename_Call { _c.Call.Run(func(args mock.Arguments) { - run() + run(args[0].(string), args[1].(string)) }) return _c } @@ -118,7 +120,7 @@ func (_c *MockEncoder_GenerateFilename_Call) Return(_a0 string) *MockEncoder_Gen return _c } -func (_c *MockEncoder_GenerateFilename_Call) RunAndReturn(run func() string) *MockEncoder_GenerateFilename_Call { +func (_c *MockEncoder_GenerateFilename_Call) RunAndReturn(run func(string, string) string) *MockEncoder_GenerateFilename_Call { _c.Call.Return(run) return _c } diff --git a/mocks/StreamingReader_mock.go b/mocks/StreamingReader_mock.go index 1d0c17f7..d24b1ddd 100644 --- a/mocks/StreamingReader_mock.go +++ b/mocks/StreamingReader_mock.go @@ -67,6 +67,42 @@ func (_c *MockStreamingReader_GetType_Call) RunAndReturn(run func() string) *Moc return _c } +// StreamFile provides a mock function with given fields: ctx, filename, readersCh, errorsCh +func (_m *MockStreamingReader) StreamFile(ctx context.Context, filename string, readersCh chan<- io.ReadCloser, errorsCh chan<- error) { + _m.Called(ctx, filename, readersCh, errorsCh) +} + +// MockStreamingReader_StreamFile_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StreamFile' +type MockStreamingReader_StreamFile_Call struct { + *mock.Call +} + +// StreamFile is a helper method to define mock.On call +// - ctx context.Context +// - filename string +// - readersCh chan<- io.ReadCloser +// - errorsCh chan<- error +func (_e *MockStreamingReader_Expecter) StreamFile(ctx interface{}, filename interface{}, readersCh interface{}, errorsCh interface{}) *MockStreamingReader_StreamFile_Call { + return &MockStreamingReader_StreamFile_Call{Call: _e.mock.On("StreamFile", ctx, filename, readersCh, errorsCh)} +} + +func (_c *MockStreamingReader_StreamFile_Call) Run(run func(ctx context.Context, filename string, readersCh chan<- io.ReadCloser, errorsCh chan<- error)) *MockStreamingReader_StreamFile_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(chan<- io.ReadCloser), args[3].(chan<- error)) + }) + return _c +} + +func (_c *MockStreamingReader_StreamFile_Call) Return() *MockStreamingReader_StreamFile_Call { + _c.Call.Return() + return _c +} + +func (_c *MockStreamingReader_StreamFile_Call) RunAndReturn(run func(context.Context, string, chan<- io.ReadCloser, chan<- error)) *MockStreamingReader_StreamFile_Call { + _c.Call.Return(run) + return _c +} + // StreamFiles provides a mock function with given fields: _a0, _a1, _a2 func (_m *MockStreamingReader) StreamFiles(_a0 context.Context, _a1 chan<- io.ReadCloser, _a2 chan<- error) { _m.Called(_a0, _a1, _a2) diff --git a/mocks/Writer_mock.go b/mocks/Writer_mock.go index d83e6ff4..1642431f 100644 --- a/mocks/Writer_mock.go +++ b/mocks/Writer_mock.go @@ -126,6 +126,52 @@ func (_c *MockWriter_NewWriter_Call) RunAndReturn(run func(context.Context, stri return _c } +// RemoveFiles provides a mock function with given fields: ctx +func (_m *MockWriter) RemoveFiles(ctx context.Context) error { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for RemoveFiles") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockWriter_RemoveFiles_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveFiles' +type MockWriter_RemoveFiles_Call struct { + *mock.Call +} + +// RemoveFiles is a helper method to define mock.On call +// - ctx context.Context +func (_e *MockWriter_Expecter) RemoveFiles(ctx interface{}) *MockWriter_RemoveFiles_Call { + return &MockWriter_RemoveFiles_Call{Call: _e.mock.On("RemoveFiles", ctx)} +} + +func (_c *MockWriter_RemoveFiles_Call) Run(run func(ctx context.Context)) *MockWriter_RemoveFiles_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *MockWriter_RemoveFiles_Call) Return(_a0 error) *MockWriter_RemoveFiles_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockWriter_RemoveFiles_Call) RunAndReturn(run func(context.Context) error) *MockWriter_RemoveFiles_Call { + _c.Call.Return(run) + return _c +} + // NewMockWriter creates a new instance of MockWriter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockWriter(t interface { diff --git a/models/data_models.go b/models/data_models.go index 33833229..9863d664 100644 --- a/models/data_models.go +++ b/models/data_models.go @@ -101,14 +101,18 @@ type Token struct { Record *Record Type TokenType Size uint64 + // Filter represents serialized partition filter for page, that record belongs to. + // Is used only on pagination read, to save reading states. + Filter *PartitionFilterSerialized } // NewRecordToken creates a new token with the given record. -func NewRecordToken(r *Record, size uint64) *Token { +func NewRecordToken(r *Record, size uint64, filter *PartitionFilterSerialized) *Token { return &Token{ Record: r, Type: TokenTypeRecord, Size: size, + Filter: filter, } } diff --git a/models/partition_filter_serialized.go b/models/partition_filter_serialized.go new file mode 100644 index 00000000..56307d3a --- /dev/null +++ b/models/partition_filter_serialized.go @@ -0,0 +1,61 @@ +// Copyright 2024 Aerospike, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package models + +import ( + "fmt" + + a "github.com/aerospike/aerospike-client-go/v7" +) + +// PartitionFilterSerialized represent serialized a.PartitionFilter. +// To save cursor state. +type PartitionFilterSerialized struct { + Begin int + Count int + Digest []byte + Cursor []byte + // Worker number. + N int +} + +// NewPartitionFilterSerialized serialize *a.PartitionFilter and returns new PartitionFilterSerialized instance. +func NewPartitionFilterSerialized(pf *a.PartitionFilter) (PartitionFilterSerialized, error) { + if pf == nil || pf.IsDone() { + return PartitionFilterSerialized{}, nil + } + + c, err := pf.EncodeCursor() + if err != nil { + return PartitionFilterSerialized{}, fmt.Errorf("failed to encode cursor: %w", err) + } + + return PartitionFilterSerialized{ + Begin: pf.Begin, + Count: pf.Count, + Digest: pf.Digest, + Cursor: c, + }, nil +} + +// Decode decodes *PartitionFilterSerialized to *a.PartitionFilter +func (p *PartitionFilterSerialized) Decode() (*a.PartitionFilter, error) { + pf := &a.PartitionFilter{Begin: p.Begin, Count: p.Count, Digest: p.Digest} + if err := pf.DecodeCursor(p.Cursor); err != nil { + return nil, fmt.Errorf("failed to decode cursor: %w", err) + } + + return pf, nil +} diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index eb1ecbe4..0516ca55 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -16,6 +16,7 @@ package pipeline import ( "context" + "fmt" "sync" ) @@ -42,6 +43,9 @@ type Pipeline[T any] struct { receive <-chan T send chan<- T stages []*stage[T] + // For synced pipeline we must create same number of workers for each stage. + // Then we will initialize communication channels strait from worker to worker through stages. + isSynced bool } var _ Worker[any] = (*Pipeline[any])(nil) @@ -49,16 +53,31 @@ var _ Worker[any] = (*Pipeline[any])(nil) const channelSize = 256 // NewPipeline creates a new DataPipeline. -func NewPipeline[T any](workGroups ...[]Worker[T]) *Pipeline[T] { +func NewPipeline[T any](isSynced bool, workGroups ...[]Worker[T]) (*Pipeline[T], error) { + if len(workGroups) == 0 { + return nil, fmt.Errorf("workGroups is empty") + } + stages := make([]*stage[T], len(workGroups)) + // Check that all working groups have same number of workers. + if isSynced { + firstLen := len(workGroups[0]) + for i := range workGroups { + if len(workGroups[i]) != firstLen { + return nil, fmt.Errorf("all workers groups must be same length in sync mode") + } + } + } + for i, workers := range workGroups { - stages[i] = newStage(workers...) + stages[i] = newStage(isSynced, workers...) } return &Pipeline[T]{ - stages: stages, - } + stages: stages, + isSynced: isSynced, + }, nil } // SetReceiveChan sets the receive channel for the pipeline. @@ -90,21 +109,51 @@ func (dp *Pipeline[T]) Run(ctx context.Context) error { errors := make(chan error, len(dp.stages)) - var lastSend chan T + var ( + lastSend []<-chan T + // To initialize pipeline workers correctly, we need to create empty channels for first and last stages. + emptySendChans []chan<- T + emptyReceiveChans []<-chan T + ) for _, s := range dp.stages { - send := make(chan T, channelSize) - s.SetSendChan(send) + sendChans := make([]chan<- T, 0, len(s.workers)) + receiveChans := make([]<-chan T, 0, len(s.workers)) + + emptySendChans = make([]chan<- T, 0, len(s.workers)) + emptyReceiveChans = make([]<-chan T, 0, len(s.workers)) + + if dp.isSynced { + for i := 0; i < len(s.workers); i++ { + // For synced mode, we don't add buffer to channels, not to lose any data. + send := make(chan T) + sendChans = append(sendChans, send) + receiveChans = append(receiveChans, send) + + empty := make(chan T) + emptySendChans = append(emptySendChans, empty) + emptyReceiveChans = append(emptyReceiveChans, empty) + } + } else { + send := make(chan T, channelSize) + sendChans = append(sendChans, send) + receiveChans = append(receiveChans, send) + + emptySendChans = append(emptySendChans, dp.send) + emptyReceiveChans = append(emptyReceiveChans, dp.receive) + } + + s.SetSendChan(sendChans) s.SetReceiveChan(lastSend) - lastSend = send + lastSend = receiveChans } // set the receive and send channels for first // and last stages to the pipeline's receive and send channels - dp.stages[0].SetReceiveChan(dp.receive) - dp.stages[len(dp.stages)-1].SetSendChan(dp.send) + dp.stages[0].SetReceiveChan(emptyReceiveChans) + dp.stages[len(dp.stages)-1].SetSendChan(emptySendChans) wg := &sync.WaitGroup{} for _, s := range dp.stages { @@ -133,22 +182,25 @@ func (dp *Pipeline[T]) Run(ctx context.Context) error { } type stage[T any] struct { - receive <-chan T - send chan<- T + receive []<-chan T + send []chan<- T workers []Worker[T] + // if synced, we distribute communication channels through workers. + isSynced bool } -func (s *stage[T]) SetReceiveChan(c <-chan T) { +func (s *stage[T]) SetReceiveChan(c []<-chan T) { s.receive = c } -func (s *stage[T]) SetSendChan(c chan<- T) { +func (s *stage[T]) SetSendChan(c []chan<- T) { s.send = c } -func newStage[T any](workers ...Worker[T]) *stage[T] { +func newStage[T any](isSynced bool, workers ...Worker[T]) *stage[T] { s := stage[T]{ - workers: workers, + workers: workers, + isSynced: isSynced, } return &s @@ -159,9 +211,17 @@ func (s *stage[T]) Run(ctx context.Context) error { return nil } - for _, w := range s.workers { - w.SetReceiveChan(s.receive) - w.SetSendChan(s.send) + for i, w := range s.workers { + if s.isSynced { + // If it is not sync mode, there will be 1 channel in each slice. + w.SetReceiveChan(s.receive[i]) + w.SetSendChan(s.send[i]) + + continue + } + // Else we distribute all channels to workers. + w.SetReceiveChan(s.receive[0]) + w.SetSendChan(s.send[0]) } ctx, cancel := context.WithCancel(ctx) @@ -187,8 +247,10 @@ func (s *stage[T]) Run(ctx context.Context) error { wg.Wait() - if s.send != nil { - close(s.send) + for i := range s.send { + if s.send[i] != nil { + close(s.send[i]) + } } close(errors) diff --git a/pipeline/pipline_test.go b/pipeline/pipline_test.go index 857cb685..12838ea7 100644 --- a/pipeline/pipline_test.go +++ b/pipeline/pipline_test.go @@ -35,7 +35,8 @@ func (suite *pipelineTestSuite) TestNewDataPipeline() { workers := [][]Worker[string]{{w1, w2}, {w3}} - pipeline := NewPipeline(workers...) + pipeline, err := NewPipeline(false, workers...) + suite.Require().Nil(err) suite.NotNil(pipeline) } @@ -57,11 +58,12 @@ func (suite *pipelineTestSuite) TestDataPipelineRun() { workers := [][]Worker[string]{{w1, w2}, {w3}} - pipeline := NewPipeline(workers...) + pipeline, err := NewPipeline(false, workers...) + suite.Require().Nil(err) suite.NotNil(pipeline) ctx := context.Background() - err := pipeline.Run(ctx) + err = pipeline.Run(ctx) suite.Nil(err) } @@ -126,7 +128,8 @@ func (suite *pipelineTestSuite) TestDataPipelineRunWithChannels() { w4.EXPECT().Run(ctx) workers := [][]Worker[string]{{w1, w2}, {w3}, {w4}} - pipeline := NewPipeline(workers...) + pipeline, err := NewPipeline(false, workers...) + suite.Require().Nil(err) suite.NotNil(pipeline) receive := make(chan string, 2) @@ -139,7 +142,7 @@ func (suite *pipelineTestSuite) TestDataPipelineRunWithChannels() { receive <- "1" close(receive) - err := pipeline.Run(ctx) + err = pipeline.Run(ctx) suite.Nil(err) suite.Equal(2, len(send)) @@ -174,11 +177,12 @@ func (suite *pipelineTestSuite) TestDataPipelineRunWorkerFails() { workers := [][]Worker[string]{{w1, w2}, {w3}, {w4}} - pipeline := NewPipeline(workers...) + pipeline, err := NewPipeline(false, workers...) + suite.Require().Nil(err) suite.NotNil(pipeline) ctx := context.Background() - err := pipeline.Run(ctx) + err = pipeline.Run(ctx) suite.NotNil(err) } diff --git a/state.go b/state.go new file mode 100644 index 00000000..31f94b82 --- /dev/null +++ b/state.go @@ -0,0 +1,271 @@ +// Copyright 2024 Aerospike, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package backup + +import ( + "context" + "encoding/gob" + "fmt" + "io" + "log/slog" + "sync" + "time" + + a "github.com/aerospike/aerospike-client-go/v7" + "github.com/aerospike/backup-go/models" +) + +// State contains current backups status data. +type State struct { + // Global backup context. + ctx context.Context + + // Counter to count how many times State instance was initialized. + // Is used to create suffix for backup files. + Counter int + // RecordsStateChan communication channel to save current filter state. + RecordsStateChan chan models.PartitionFilterSerialized + // RecordStates store states of all filters. + RecordStates map[int]models.PartitionFilterSerialized + + RecordStatesSaved map[int]models.PartitionFilterSerialized + // SaveCommandChan command to save current state for worker. + SaveCommandChan chan int + // Mutex for RecordStates operations. + // Ordinary mutex is used, because we must not allow any writings when we read state. + mu sync.Mutex + // File to save state to. + FileName string + + // writer is used to create a state file. + writer Writer + // logger for logging errors. + logger *slog.Logger +} + +// NewState returns new state instance depending on config. +// If we continue back up, the state will be loaded from a state file, +// if it is the first operation, new state instance will be returned. +func NewState( + ctx context.Context, + config *BackupConfig, + reader StreamingReader, + writer Writer, + logger *slog.Logger, +) (*State, error) { + switch { + case config.isStateFirstRun(): + return newState(ctx, config, writer, logger), nil + case config.isStateContinue(): + s, err := newStateFromFile(ctx, config, reader, writer, logger) + if err != nil { + return nil, err + } + // change filters in config. + config.PartitionFilters, err = s.loadPartitionFilters() + if err != nil { + return nil, err + } + + return s, nil + } + + return nil, nil +} + +// newState creates status service from parameters, for backup operations. +func newState( + ctx context.Context, + config *BackupConfig, + writer Writer, + logger *slog.Logger, +) *State { + s := &State{ + ctx: ctx, + // RecordsStateChan must not be buffered, so we can stop all operations. + RecordsStateChan: make(chan models.PartitionFilterSerialized), + RecordStates: make(map[int]models.PartitionFilterSerialized), + RecordStatesSaved: make(map[int]models.PartitionFilterSerialized), + SaveCommandChan: make(chan int), + FileName: config.StateFile, + writer: writer, + logger: logger, + } + // Run watcher on initialization. + go s.serve() + go s.serveRecords() + + return s +} + +// newStateFromFile creates a status service from the file, to continue operations. +func newStateFromFile( + ctx context.Context, + config *BackupConfig, + reader StreamingReader, + writer Writer, + logger *slog.Logger, +) (*State, error) { + f, err := openFile(ctx, reader, config.StateFile) + if err != nil { + return nil, fmt.Errorf("failed to open state file: %w", err) + } + + dec := gob.NewDecoder(f) + + var s State + if err = dec.Decode(&s); err != nil { + return nil, fmt.Errorf("failed to decode state: %w", err) + } + + s.ctx = ctx + s.writer = writer + s.logger = logger + s.RecordsStateChan = make(chan models.PartitionFilterSerialized) + s.SaveCommandChan = make(chan int) + s.Counter++ + + // Init current state. + for k, v := range s.RecordStatesSaved { + s.RecordStates[k] = v + } + + logger.Debug("loaded state file successfully", slog.Int("filters loaded", len(s.RecordStatesSaved))) + + // Run watcher on initialization. + go s.serve() + go s.serveRecords() + + return &s, nil +} + +// serve dumps files to disk. +func (s *State) serve() { + for msg := range s.SaveCommandChan { + if err := s.dump(msg); err != nil { + s.logger.Error("failed to dump state", slog.Any("error", err)) + return + } + } +} + +func (s *State) dump(n int) error { + file, err := s.writer.NewWriter(s.ctx, s.FileName) + if err != nil { + return fmt.Errorf("failed to create state file %s: %w", s.FileName, err) + } + + enc := gob.NewEncoder(file) + + s.mu.Lock() + + if n > -1 { + s.RecordStatesSaved[n] = s.RecordStates[n] + } + + if err = enc.Encode(s); err != nil { + return fmt.Errorf("failed to encode state data: %w", err) + } + + file.Close() + + s.mu.Unlock() + + s.logger.Debug("state file dumped", slog.Time("saved at", time.Now())) + + return nil +} + +func (s *State) InitState(pf []*a.PartitionFilter) error { + s.mu.Lock() + for i := range pf { + pfs, err := models.NewPartitionFilterSerialized(pf[i]) + if err != nil { + return err + } + + s.RecordStates[i] = pfs + s.RecordStatesSaved[i] = pfs + } + s.mu.Unlock() + + return s.dump(-1) +} + +func (s *State) loadPartitionFilters() ([]*a.PartitionFilter, error) { + s.mu.Lock() + + result := make([]*a.PartitionFilter, 0, len(s.RecordStatesSaved)) + + for _, state := range s.RecordStatesSaved { + f, err := state.Decode() + if err != nil { + return nil, err + } + + result = append(result, f) + } + + s.mu.Unlock() + + return result, nil +} + +func (s *State) serveRecords() { + var counter int + + for { + select { + case <-s.ctx.Done(): + return + case state := <-s.RecordsStateChan: + if state.Begin == 0 && state.Count == 0 && state.Digest == nil { + continue + } + + counter++ + + s.mu.Lock() + s.RecordStates[state.N] = state + s.mu.Unlock() + } + } +} + +func (s *State) getFileSuffix() string { + if s.Counter > 0 { + return fmt.Sprintf("(%d)", s.Counter) + } + + return "" +} + +func openFile(ctx context.Context, reader StreamingReader, fileName string) (io.ReadCloser, error) { + readCh := make(chan io.ReadCloser) + errCh := make(chan error) + + go reader.StreamFile(ctx, fileName, readCh, errCh) + + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case err := <-errCh: + return nil, err + case file := <-readCh: + return file, nil + } + } +} diff --git a/state_test.go b/state_test.go new file mode 100644 index 00000000..39914e11 --- /dev/null +++ b/state_test.go @@ -0,0 +1,100 @@ +// Copyright 2024 Aerospike, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package backup + +import ( + "context" + "log/slog" + "os" + "path/filepath" + "testing" + + a "github.com/aerospike/aerospike-client-go/v7" + "github.com/aerospike/backup-go/io/encoding/asb" + "github.com/aerospike/backup-go/io/local" + "github.com/aerospike/backup-go/models" + "github.com/stretchr/testify/require" +) + +const ( + testStateFile = "test_state_file" +) + +func TestState(t *testing.T) { + t.Parallel() + + testDir := t.TempDir() + tempFile := filepath.Join(testDir, testStateFile) + + testFilters := []*a.PartitionFilter{ + NewPartitionFilterByID(1), + NewPartitionFilterByID(2), + } + + ctx := context.Background() + + cfg := NewDefaultBackupConfig() + cfg.StateFile = testStateFile + cfg.PageSize = 100000 + cfg.SyncPipelines = true + cfg.PartitionFilters = testFilters + + reader, err := local.NewReader( + local.WithDir(testDir), + ) + require.NoError(t, err) + + writer, err := local.NewWriter( + ctx, + local.WithValidator(asb.NewValidator()), + local.WithSkipDirCheck(), + local.WithDir(testDir), + ) + require.NoError(t, err) + + logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) + + // Check init. + state, err := NewState(ctx, cfg, reader, writer, logger) + require.NotNil(t, state) + require.NoError(t, err) + + err = state.InitState(testFilters) + require.NoError(t, err) + + for i := range testFilters { + pfs, err := models.NewPartitionFilterSerialized(testFilters[i]) + require.NoError(t, err) + state.RecordsStateChan <- pfs + } + + // Check that file exists. + _, err = os.Stat(tempFile) + require.NoError(t, err) + + result := []*a.PartitionFilter{ + NewPartitionFilterByID(1), + NewPartitionFilterByID(2), + } + + // Check restore. + newCtx := context.Background() + cfg.Continue = true + newState, err := NewState(newCtx, cfg, reader, writer, logger) + require.NoError(t, err) + newPf, err := newState.loadPartitionFilters() + require.NoError(t, err) + require.EqualValues(t, newPf, result) +} diff --git a/tests/integration/integration_test.go b/tests/integration/integration_test.go index d87b41a6..c46ae953 100644 --- a/tests/integration/integration_test.go +++ b/tests/integration/integration_test.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "log/slog" + "math/rand" "os" "testing" "time" @@ -238,6 +239,7 @@ func runBackupRestore(suite *backupRestoreTestSuite, backupConfig *backup.Backup ctx, backupConfig, &dst, + nil, ) suite.Nil(err) suite.NotNil(bh) @@ -384,6 +386,7 @@ func runBackupRestoreDirectory(suite *backupRestoreTestSuite, ctx, backupConfig, writers, + nil, ) suite.Nil(err) suite.NotNil(bh) @@ -463,7 +466,7 @@ func (suite *backupRestoreTestSuite) TestRestoreExpiredRecords() { VoidTime: 1, } - token := models.NewRecordToken(modelRec, 0) + token := models.NewRecordToken(modelRec, 0, nil) v, err := encoder.EncodeToken(token) if err != nil { suite.FailNow(err.Error()) @@ -556,6 +559,7 @@ func (suite *backupRestoreTestSuite) TestBackupRestoreIOWithPartitions() { ctx, backupConfig, writers, + nil, ) suite.Nil(err) suite.NotNil(bh) @@ -595,6 +599,7 @@ func (suite *backupRestoreTestSuite) TestBackupContext() { ctx, backup.NewDefaultBackupConfig(), &writer, + nil, ) suite.NotNil(bh) suite.Nil(err) @@ -746,6 +751,7 @@ func (suite *backupRestoreTestSuite) TestBackupParallelNodes() { ctx, bCfg, &dst, + nil, ) suite.NotNil(bh) suite.Nil(err) @@ -763,6 +769,7 @@ func (suite *backupRestoreTestSuite) TestBackupParallelNodesList() { ctx, bCfg, &dst, + nil, ) suite.NotNil(bh) suite.Nil(err) @@ -792,6 +799,7 @@ func (suite *backupRestoreTestSuite) TestBackupPartitionList() { ctx, bCfg, &dst, + nil, ) suite.NotNil(bh) suite.Nil(err) @@ -1081,6 +1089,7 @@ func (suite *backupRestoreTestSuite) TestBackupAfterDigestOk() { ctx, backupConfig, &dst, + nil, ) suite.Nil(err) suite.NotNil(bh) @@ -1103,6 +1112,128 @@ func (suite *backupRestoreTestSuite) TestBackupEstimateOk() { suite.TearDownTest() } +func (suite *backupRestoreTestSuite) TestBackupContinuation() { + const totalRecords = 900 + batch := genRecords(suite.namespace, suite.set, totalRecords, testBins) + suite.SetupTest(batch) + + testFolder := suite.T().TempDir() + testFile := "test_state_file" + + for i := 0; i < 5; i++ { + randomNumber := rand.Intn(7-3+1) + 3 + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(time.Duration(randomNumber) * time.Second) + cancel() + }() + + first := suite.runFirstBackup(ctx, testFolder, testFile, i) + + ctx = context.Background() + second := suite.runContinueBackup(ctx, testFolder, testFile, i) + + suite.T().Log("first:", first, "second:", second) + result := (first + second) >= totalRecords + suite.T().Log(first + second) + suite.Equal(true, result) + } + + suite.TearDownTest() +} + +func (suite *backupRestoreTestSuite) runFirstBackup(ctx context.Context, testFolder, testStateFile string, i int, +) uint64 { + bFolder := fmt.Sprintf("%s_%d", testFolder, i) + + writers, err := local.NewWriter( + ctx, + local.WithValidator(asb.NewValidator()), + local.WithSkipDirCheck(), + local.WithDir(bFolder), + ) + if err != nil { + panic(err) + } + + readers, err := local.NewReader( + local.WithDir(bFolder), + ) + if err != nil { + panic(err) + } + + backupCfg := backup.NewDefaultBackupConfig() + backupCfg.Namespace = suite.namespace + backupCfg.ParallelRead = 10 + backupCfg.ParallelWrite = 10 + + backupCfg.StateFile = testStateFile + backupCfg.FileLimit = 100000 + backupCfg.Bandwidth = 1000000 + backupCfg.PageSize = 100 + backupCfg.SyncPipelines = true + + backupHandler, err := suite.backupClient.Backup(ctx, backupCfg, writers, readers) + if err != nil { + panic(err) + } + + // use backupHandler.Wait() to wait for the job to finish or fail + err = backupHandler.Wait(ctx) + if err != nil { + suite.T().Logf("Backup failed: %v", err) + } + + return backupHandler.GetStats().GetReadRecords() +} + +func (suite *backupRestoreTestSuite) runContinueBackup(ctx context.Context, testFolder, testStateFile string, i int, +) uint64 { + bFolder := fmt.Sprintf("%s_%d", testFolder, i) + + writers, err := local.NewWriter( + ctx, + local.WithValidator(asb.NewValidator()), + local.WithSkipDirCheck(), + local.WithDir(bFolder), + ) + if err != nil { + panic(err) + } + + readers, err := local.NewReader( + local.WithDir(bFolder), + ) + if err != nil { + panic(err) + } + + backupCfg := backup.NewDefaultBackupConfig() + backupCfg.Namespace = suite.namespace + backupCfg.ParallelRead = 10 + backupCfg.ParallelWrite = 10 + + backupCfg.StateFile = testStateFile + backupCfg.Continue = true + backupCfg.FileLimit = 100000 + backupCfg.PageSize = 100 + backupCfg.SyncPipelines = true + + backupHandler, err := suite.backupClient.Backup(ctx, backupCfg, writers, readers) + if err != nil { + panic(err) + } + + // use backupHandler.Wait() to wait for the job to finish or fail + err = backupHandler.Wait(ctx) + if err != nil { + suite.T().Logf("Backup failed: %v", err) + } + + return backupHandler.GetStats().GetReadRecords() +} + type byteReadWriterFactory struct { buffer *bytes.Buffer } @@ -1113,6 +1244,12 @@ func (b *byteReadWriterFactory) StreamFiles(_ context.Context, readersCh chan<- close(readersCh) } +func (b *byteReadWriterFactory) StreamFile(_ context.Context, _ string, readersCh chan<- io.ReadCloser, _ chan<- error) { + reader := io.NopCloser(bytes.NewReader(b.buffer.Bytes())) + readersCh <- reader + close(readersCh) +} + func (b *byteReadWriterFactory) OpenFile(_ context.Context, _ string, readersCh chan<- io.ReadCloser, _ chan<- error) { reader := io.NopCloser(bytes.NewReader(b.buffer.Bytes())) readersCh <- reader diff --git a/writers.go b/writers.go index 2f7b8a08..7ca31036 100644 --- a/writers.go +++ b/writers.go @@ -85,21 +85,32 @@ func (tw *tokenStatsWriter) Close() error { // It writes the types from the models package as encoded data // to an io.Writer. It uses an Encoder to encode the data. type tokenWriter struct { - encoder Encoder - output io.Writer - logger *slog.Logger + encoder Encoder + output io.Writer + logger *slog.Logger + recordsStateChan chan<- models.PartitionFilterSerialized + // Number of writer. + n int } // newTokenWriter creates a new tokenWriter. -func newTokenWriter(encoder Encoder, output io.Writer, logger *slog.Logger) *tokenWriter { +func newTokenWriter( + encoder Encoder, + output io.Writer, + logger *slog.Logger, + recordsStateChan chan<- models.PartitionFilterSerialized, + n int, +) *tokenWriter { id := uuid.NewString() logger = logging.WithWriter(logger, id, logging.WriterTypeToken) logger.Debug("created new token writer") return &tokenWriter{ - encoder: encoder, - output: output, - logger: logger, + encoder: encoder, + output: output, + logger: logger, + recordsStateChan: recordsStateChan, + n: n, } } @@ -110,6 +121,12 @@ func (w *tokenWriter) Write(v *models.Token) (int, error) { return 0, fmt.Errorf("error encoding token: %w", err) } + if w.recordsStateChan != nil && v.Filter != nil { + // Set worker number. + v.Filter.N = w.n + w.recordsStateChan <- *v.Filter + } + return w.output.Write(data) } diff --git a/writers_test.go b/writers_test.go index 4dcce0b3..5517be74 100644 --- a/writers_test.go +++ b/writers_test.go @@ -49,7 +49,7 @@ func (suite *writersTestSuite) TestTokenWriter() { }, }, } - recToken := models.NewRecordToken(expRecord, 0) + recToken := models.NewRecordToken(expRecord, 0, nil) expUDF := &models.UDF{ Name: "udf", @@ -70,7 +70,7 @@ func (suite *writersTestSuite) TestTokenWriter() { mockEncoder.EXPECT().EncodeToken(invalidToken).Return(nil, errors.New("error")) dst := bytes.Buffer{} - writer := newTokenWriter(mockEncoder, &dst, slog.Default()) + writer := newTokenWriter(mockEncoder, &dst, slog.Default(), nil, -1) suite.NotNil(writer) _, err := writer.Write(recToken) @@ -92,7 +92,7 @@ func (suite *writersTestSuite) TestTokenWriter() { failRec := &models.Record{ Record: &a.Record{}, } - failRecToken := models.NewRecordToken(failRec, 0) + failRecToken := models.NewRecordToken(failRec, 0, nil) mockEncoder.EXPECT().EncodeToken(failRecToken).Return(nil, errors.New("error")) _, err = writer.Write(failRecToken) suite.NotNil(err) @@ -103,7 +103,7 @@ func (suite *writersTestSuite) TestTokenWriter() { func (suite *writersTestSuite) TestTokenStatsWriter() { mockWriter := pipemocks.NewMockDataWriter[*models.Token](suite.T()) - mockWriter.EXPECT().Write(models.NewRecordToken(&models.Record{}, 0)).Return(1, nil) + mockWriter.EXPECT().Write(models.NewRecordToken(&models.Record{}, 0, nil)).Return(1, nil) mockWriter.EXPECT().Write(models.NewSIndexToken(&models.SIndex{}, 0)).Return(1, nil) mockWriter.EXPECT().Write(models.NewUDFToken(&models.UDF{}, 0)).Return(1, nil) mockWriter.EXPECT().Write(&models.Token{Type: models.TokenTypeInvalid}).Return(0, errors.New("error")) @@ -116,7 +116,7 @@ func (suite *writersTestSuite) TestTokenStatsWriter() { writer := newWriterWithTokenStats(mockWriter, mockStats, slog.Default()) suite.NotNil(writer) - _, err := writer.Write(models.NewRecordToken(&models.Record{}, 0)) + _, err := writer.Write(models.NewRecordToken(&models.Record{}, 0, nil)) suite.Nil(err) _, err = writer.Write(models.NewSIndexToken(&models.SIndex{}, 0))