diff --git a/README.md b/README.md index 39640280..057f9c6c 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,7 @@ func main() { // For backup to single file use local.WithFile(fileName) writers, err := local.NewWriter( + context.Background(), local.WithRemoveFiles(), local.WithDir("backups_folder"), ) @@ -52,10 +53,11 @@ func main() { backupCfg := backup.NewDefaultBackupConfig() backupCfg.Namespace = "test" - backupCfg.Parallel = 5 + backupCfg.ParallelRead = 10 + backupCfg.ParallelWrite = 10 ctx := context.Background() - backupHandler, err := backupClient.Backup(ctx, backupCfg, writers) + backupHandler, err := backupClient.Backup(ctx, backupCfg, writers, nil) if err != nil { panic(err) } diff --git a/client.go b/client.go index c0d14dad..c8723ebe 100644 --- a/client.go +++ b/client.go @@ -218,6 +218,7 @@ func (c *Client) Backup( if err != nil { return nil, fmt.Errorf("failed to create backup handler: %w", err) } + handler.run() return handler, nil diff --git a/cmd/internal/app/asbackup.go b/cmd/internal/app/asbackup.go index 87a91418..31a6650f 100644 --- a/cmd/internal/app/asbackup.go +++ b/cmd/internal/app/asbackup.go @@ -30,6 +30,8 @@ type ASBackup struct { backupClient *backup.Client backupConfig *backup.BackupConfig writer backup.Writer + // reader is used to read state file. + reader backup.StreamingReader // Additional params. isEstimate bool estimatesSamples int64 @@ -60,6 +62,7 @@ func NewASBackup( // Initializations. var ( writer backup.Writer + reader backup.StreamingReader err error ) // We initialize a writer only if output is configured. @@ -75,6 +78,15 @@ func NewASBackup( } } + if backupParams.StateFileDst != "" || backupParams.Continue != "" { + r := &models.Restore{InputFile: backupParams.OutputFile} + + reader, err = getReader(ctx, r, commonParams, awsS3, gcpStorage, azureBlob) + if err != nil { + return nil, fmt.Errorf("failed to create reader: %w", err) + } + } + aerospikeClient, err := newAerospikeClient(clientConfig, backupParams.PreferRacks) if err != nil { return nil, fmt.Errorf("failed to create aerospike client: %w", err) @@ -100,6 +112,7 @@ func NewASBackup( backupClient: backupClient, backupConfig: backupConfig, writer: writer, + reader: reader, isEstimate: backupParams.Estimate, estimatesSamples: backupParams.EstimateSamples, }, nil @@ -120,7 +133,7 @@ func (b *ASBackup) Run(ctx context.Context) error { printEstimateReport(estimates) default: - h, err := b.backupClient.Backup(ctx, b.backupConfig, b.writer) + h, err := b.backupClient.Backup(ctx, b.backupConfig, b.writer, b.reader) if err != nil { return fmt.Errorf("failed to start backup: %w", err) } diff --git a/cmd/internal/app/configs.go b/cmd/internal/app/configs.go index 93800065..c2b52fdd 100644 --- a/cmd/internal/app/configs.go +++ b/cmd/internal/app/configs.go @@ -59,10 +59,16 @@ func mapBackupConfig( c.ParallelWrite = commonParams.Parallel c.ParallelRead = commonParams.Parallel // As we set --nice in MiB we must convert it to bytes - // TODO: make Bandwidth int64 to avoid overflow. c.Bandwidth = commonParams.Nice * 1024 * 1024 c.Compact = backupParams.Compact c.NoTTLOnly = backupParams.NoTTLOnly + c.StateFileDumpDuration = time.Duration(backupParams.StateFileDumpDuration) * time.Millisecond + c.StateFile = backupParams.StateFileDst + + if backupParams.Continue != "" { + c.StateFile = backupParams.Continue + c.Continue = true + } // Overwrite partitions if we use nodes. if backupParams.ParallelNodes || backupParams.NodeList != "" { @@ -135,7 +141,6 @@ func mapRestoreConfig( c.WritePolicy = mapWritePolicy(restoreParams, commonParams) c.InfoPolicy = mapInfoPolicy(restoreParams.TimeOut) // As we set --nice in MiB we must convert it to bytes - // TODO: make Bandwidth int64 to avoid overflow. c.Bandwidth = commonParams.Nice * 1024 * 1024 c.ExtraTTL = restoreParams.ExtraTTL c.IgnoreRecordError = restoreParams.IgnoreRecordError @@ -288,7 +293,6 @@ func recordExistsAction(replace, unique bool) aerospike.RecordExistsAction { } } -// TODO: why no info policy timeout is set for backup in C tool? func mapInfoPolicy(timeOut int64) *aerospike.InfoPolicy { p := aerospike.NewInfoPolicy() p.Timeout = time.Duration(timeOut) * time.Millisecond diff --git a/cmd/internal/flags/backup.go b/cmd/internal/flags/backup.go index 5d7fb0df..0fedfc21 100644 --- a/cmd/internal/flags/backup.go +++ b/cmd/internal/flags/backup.go @@ -123,6 +123,20 @@ func (f *Backup) NewFlagSet() *pflag.FlagSet { flagSet.Int64Var(&f.EstimateSamples, "estimate-samples", 10000, "The number of samples to take when running a backup estimate.") + flagSet.StringVarP(&f.Continue, "continue", "c", + "", + "Resumes an interrupted/failed backup from where it was left off, given the .state file\n"+ + "that was generated from the interrupted/failed run.") + flagSet.StringVar(&f.StateFileDst, "state-file-dst", + "", + "Either a path with a file name or a directory in which the backup state file will be\n"+ + "placed if the backup is interrupted/fails. If a path with a file name is used, that\n"+ + "exact path is where the backup file will be placed. If a directory is given, the backup\n"+ + "state will be placed in the directory with name `.asb.state`, or\n"+ + "`.asb.state` if `--output-file-prefix` is given.") + flagSet.Int64Var(&f.StateFileDumpDuration, "state-file-dump-duration", + 10000, + "Intervals in milliseconds, how often dump state file to disk.") return flagSet } diff --git a/cmd/internal/models/backup.go b/cmd/internal/models/backup.go index d3ed6b3e..33f48326 100644 --- a/cmd/internal/models/backup.go +++ b/cmd/internal/models/backup.go @@ -15,25 +15,28 @@ package models type Backup struct { - OutputFile string - RemoveFiles bool - ModifiedBefore string - ModifiedAfter string - FileLimit int64 - AfterDigest string - MaxRecords int64 - NoBins bool - SleepBetweenRetries int - FilterExpression string - ParallelNodes bool - RemoveArtifacts bool - Compact bool - NodeList string - NoTTLOnly bool - PreferRacks string - PartitionList string - Estimate bool - EstimateSamples int64 + OutputFile string + RemoveFiles bool + ModifiedBefore string + ModifiedAfter string + FileLimit int64 + AfterDigest string + MaxRecords int64 + NoBins bool + SleepBetweenRetries int + FilterExpression string + ParallelNodes bool + RemoveArtifacts bool + Compact bool + NodeList string + NoTTLOnly bool + PreferRacks string + PartitionList string + Estimate bool + EstimateSamples int64 + StateFileDst string + StateFileDumpDuration int64 + Continue string } // ShouldClearTarget check if we should clean target directory. diff --git a/config_backup.go b/config_backup.go index b7606adb..861fb0cc 100644 --- a/config_backup.go +++ b/config_backup.go @@ -144,12 +144,12 @@ func (c *BackupConfig) isDefaultPartitionFilter() bool { // isStateFirstRun checks if it is first run of backup with a state file. func (c *BackupConfig) isStateFirstRun() bool { - return c.StateFile != "" && c.Continue == false + return c.StateFile != "" && !c.Continue } // isStateContinueRun checks if we continue backup from a state file. func (c *BackupConfig) isStateContinue() bool { - return c.StateFile != "" && c.Continue == true + return c.StateFile != "" && !c.Continue } func (c *BackupConfig) isFullBackup() bool { diff --git a/examples/aws/s3/main.go b/examples/aws/s3/main.go index da2f44d2..aa38b5c3 100644 --- a/examples/aws/s3/main.go +++ b/examples/aws/s3/main.go @@ -97,7 +97,7 @@ func runBackup(ctx context.Context, c *backup.Client) { // set compression policy backupCfg.CompressionPolicy = backup.NewCompressionPolicy(backup.CompressZSTD, 20) - backupHandler, err := c.Backup(ctx, backupCfg, writers) + backupHandler, err := c.Backup(ctx, backupCfg, writers, nil) if err != nil { panic(err) } diff --git a/examples/azure/blob/main.go b/examples/azure/blob/main.go index 8dbe34b5..1f3b2af3 100644 --- a/examples/azure/blob/main.go +++ b/examples/azure/blob/main.go @@ -101,7 +101,7 @@ func runBackup(ctx context.Context, c *backup.Client) { // set compression policy backupCfg.CompressionPolicy = backup.NewCompressionPolicy(backup.CompressZSTD, 20) - backupHandler, err := c.Backup(ctx, backupCfg, writers) + backupHandler, err := c.Backup(ctx, backupCfg, writers, nil) if err != nil { panic(err) } diff --git a/examples/gcp/storage/main.go b/examples/gcp/storage/main.go index 82e431a8..f3bf7fb6 100644 --- a/examples/gcp/storage/main.go +++ b/examples/gcp/storage/main.go @@ -92,7 +92,7 @@ func runBackup(ctx context.Context, c *backup.Client) { // set compression policy backupCfg.CompressionPolicy = backup.NewCompressionPolicy(backup.CompressZSTD, 20) - backupHandler, err := c.Backup(ctx, backupCfg, writers) + backupHandler, err := c.Backup(ctx, backupCfg, writers, nil) if err != nil { panic(err) } diff --git a/examples/readme/main.go b/examples/readme/main.go index 73646be9..1e0d2635 100644 --- a/examples/readme/main.go +++ b/examples/readme/main.go @@ -51,7 +51,7 @@ func main() { backupCfg.Namespace = "test" backupCfg.ParallelRead = 5 - backupHandler, err := backupClient.Backup(ctx, backupCfg, writers) + backupHandler, err := backupClient.Backup(ctx, backupCfg, writers, nil) if err != nil { panic(err) } diff --git a/handler_backup.go b/handler_backup.go index 3b177796..657f275d 100644 --- a/handler_backup.go +++ b/handler_backup.go @@ -99,10 +99,18 @@ func newBackupHandler( // redefine context cancel. ctx, cancel := context.WithCancel(ctx) - // Keep in mind, that on continue operation, we update partitions list in config by pointer. - state, err := NewState(ctx, config, reader, writer, logger) - if err != nil { - return nil, err + var ( + state *State + err error + ) + + if config.StateFile != "" { + // Keep in mind, that on continue operation, we update partitions list in config by pointer. + state, err = NewState(ctx, config, reader, writer, logger) + if err != nil { + cancel() + return nil, err + } } return &BackupHandler{ @@ -243,6 +251,7 @@ func (bh *BackupHandler) backupSync(ctx context.Context) error { if err != nil { return err } + if bh.config.isStateContinue() { // Have to reload filter, as on count records cursor is moving and future scans returns nothing. bh.config.PartitionFilters, err = bh.state.loadPartitionFilters() diff --git a/handler_backup_records.go b/handler_backup_records.go index dbcd4601..f8523181 100644 --- a/handler_backup_records.go +++ b/handler_backup_records.go @@ -37,8 +37,6 @@ type backupRecordsHandler struct { aerospikeClient AerospikeClient logger *slog.Logger scanLimiter *semaphore.Weighted - // is used when AfterDigest is set. - afterDigest []byte } func newBackupRecordsHandler( @@ -204,7 +202,9 @@ func (bh *backupRecordsHandler) makeAerospikeReadWorkersForPartition( ctx context.Context, n int, scanPolicy *a.ScanPolicy, ) ([]pipeline.Worker[*models.Token], error) { var err error + partitionGroups := bh.config.PartitionFilters + if !bh.config.isStateContinue() { partitionGroups, err = splitPartitions(bh.config.PartitionFilters, n) if err != nil { @@ -276,6 +276,7 @@ func (bh *backupRecordsHandler) recordReaderConfigForPartitions( scanPolicy *a.ScanPolicy, ) *aerospike.RecordReaderConfig { pfCopy := *partitionFilter + return aerospike.NewRecordReaderConfig( bh.config.Namespace, bh.config.SetList, diff --git a/internal/processors/change_namespace_test.go b/internal/processors/change_namespace_test.go index 7c5aa8e4..cb250a61 100644 --- a/internal/processors/change_namespace_test.go +++ b/internal/processors/change_namespace_test.go @@ -45,7 +45,7 @@ func TestChangeNamespaceProcessor(t *testing.T) { Record: &aerospike.Record{ Key: key, }, - }, 0), + }, 0, nil), wantErr: false, }, { @@ -56,7 +56,7 @@ func TestChangeNamespaceProcessor(t *testing.T) { Record: &aerospike.Record{ Key: key, }, - }, 0), + }, 0, nil), wantErr: false, }, { @@ -74,7 +74,7 @@ func TestChangeNamespaceProcessor(t *testing.T) { Record: &aerospike.Record{ Key: invalidKey, }, - }, 0), + }, 0, nil), wantErr: true, }, { @@ -85,7 +85,7 @@ func TestChangeNamespaceProcessor(t *testing.T) { Record: &aerospike.Record{ Key: key, }, - }, 0), + }, 0, nil), wantErr: false, }, } diff --git a/io/aerospike/record_reader.go b/io/aerospike/record_reader.go index d62807e4..36084780 100644 --- a/io/aerospike/record_reader.go +++ b/io/aerospike/record_reader.go @@ -141,8 +141,13 @@ func (r *RecordReader) Read() (*models.Token, error) { rec := models.Record{ Record: res.Record, } - // TODO: check how accurate is value of filter at this moment. - recToken := models.NewRecordToken(&rec, 0, *r.config.partitionFilter) + + pfs, err := models.NewPartitionFilterSerialized(r.config.partitionFilter) + if err != nil { + return nil, fmt.Errorf("failed to serialize partition filter: %w", err) + } + + recToken := models.NewRecordToken(&rec, 0, pfs) return recToken, nil } diff --git a/io/aerospike/record_reader_test.go b/io/aerospike/record_reader_test.go index 56496309..90f51513 100644 --- a/io/aerospike/record_reader_test.go +++ b/io/aerospike/record_reader_test.go @@ -85,7 +85,7 @@ func (suite *readersTestSuite) TestAerospikeRecordReader() { v, err := reader.Read() suite.Nil(err) - expectedRecToken := models.NewRecordToken(mockRec, 0) + expectedRecToken := models.NewRecordToken(mockRec, 0, nil) suite.Equal(expectedRecToken, v) mockScanner.AssertExpectations(suite.T()) } @@ -273,7 +273,7 @@ func (suite *readersTestSuite) TestAerospikeRecordReaderWithPolicy() { v, err := reader.Read() suite.Nil(err) - expectedRecToken := models.NewRecordToken(mockRec, 0) + expectedRecToken := models.NewRecordToken(mockRec, 0, nil) suite.Equal(expectedRecToken, v) mockScanner.AssertExpectations(suite.T()) } diff --git a/io/encoding/asb/decode.go b/io/encoding/asb/decode.go index ea415103..af8a2c5e 100644 --- a/io/encoding/asb/decode.go +++ b/io/encoding/asb/decode.go @@ -161,7 +161,7 @@ func (r *Decoder) NextToken() (*models.Token, error) { case *models.UDF: return models.NewUDFToken(v, size), nil case *models.Record: - return models.NewRecordToken(v, size, a.PartitionFilter{}), nil + return models.NewRecordToken(v, size, nil), nil default: return nil, fmt.Errorf("unsupported token type %T", v) } diff --git a/io/encoding/asb/decode_test.go b/io/encoding/asb/decode_test.go index e0e397e3..3f07da67 100644 --- a/io/encoding/asb/decode_test.go +++ b/io/encoding/asb/decode_test.go @@ -3718,7 +3718,7 @@ func TestASBReader_NextToken(t *testing.T) { Generation: 10, }, VoidTime: 10, - }, 106), + }, 106, nil), }, { name: "negative EOF", diff --git a/io/local/options.go b/io/local/options.go index 4322ad6f..2fd31e86 100644 --- a/io/local/options.go +++ b/io/local/options.go @@ -29,6 +29,8 @@ type options struct { withNestedDir bool // unbuffered means that writings toi disk will be unbuffered. unbuffered bool + // skipDirCheck if true, backup directory won't be checked. + skipDirCheck bool } type Opt func(*options) @@ -79,3 +81,11 @@ func WithUnbufferedWrite() Opt { r.unbuffered = true } } + +// WithSkipDirCheck adds skip dir check flags. +// Which means that backup directory won't be checked for emptiness. +func WithSkipDirCheck() Opt { + return func(r *options) { + r.skipDirCheck = true + } +} diff --git a/io/local/writer.go b/io/local/writer.go index 093dad22..71b222ff 100644 --- a/io/local/writer.go +++ b/io/local/writer.go @@ -53,17 +53,17 @@ func NewWriter(ctx context.Context, opts ...Opt) (*Writer, error) { return nil, fmt.Errorf("failed to prepare backup directory: %w", err) } - // if w.isDir { - // // Check if backup dir is empty. - // isEmpty, err := isEmptyDirectory(w.path) - // if err != nil { - // return nil, fmt.Errorf("failed to check if directory is empty: %w", err) - // } - // - // if !isEmpty && !w.isRemovingFiles { - // return nil, fmt.Errorf("backup folder must be empty or set RemoveFiles = true") - // } - // } + if w.isDir && !w.skipDirCheck { + // Check if backup dir is empty. + isEmpty, err := isEmptyDirectory(w.path) + if err != nil { + return nil, fmt.Errorf("failed to check if directory is empty: %w", err) + } + + if !isEmpty && !w.isRemovingFiles { + return nil, fmt.Errorf("backup folder must be empty or set RemoveFiles = true") + } + } // If we want to remove files from backup path. if w.isRemovingFiles { diff --git a/mocks/Encoder_mock.go b/mocks/Encoder_mock.go index 8f352f7a..98b6d1c6 100644 --- a/mocks/Encoder_mock.go +++ b/mocks/Encoder_mock.go @@ -78,17 +78,17 @@ func (_c *MockEncoder_EncodeToken_Call) RunAndReturn(run func(*models.Token) ([] return _c } -// GenerateFilename provides a mock function with given fields: -func (_m *MockEncoder) GenerateFilename() string { - ret := _m.Called() +// GenerateFilename provides a mock function with given fields: prefix +func (_m *MockEncoder) GenerateFilename(prefix string) string { + ret := _m.Called(prefix) if len(ret) == 0 { panic("no return value specified for GenerateFilename") } var r0 string - if rf, ok := ret.Get(0).(func() string); ok { - r0 = rf() + if rf, ok := ret.Get(0).(func(string) string); ok { + r0 = rf(prefix) } else { r0 = ret.Get(0).(string) } @@ -102,13 +102,14 @@ type MockEncoder_GenerateFilename_Call struct { } // GenerateFilename is a helper method to define mock.On call -func (_e *MockEncoder_Expecter) GenerateFilename() *MockEncoder_GenerateFilename_Call { - return &MockEncoder_GenerateFilename_Call{Call: _e.mock.On("GenerateFilename")} +// - prefix string +func (_e *MockEncoder_Expecter) GenerateFilename(prefix interface{}) *MockEncoder_GenerateFilename_Call { + return &MockEncoder_GenerateFilename_Call{Call: _e.mock.On("GenerateFilename", prefix)} } -func (_c *MockEncoder_GenerateFilename_Call) Run(run func()) *MockEncoder_GenerateFilename_Call { +func (_c *MockEncoder_GenerateFilename_Call) Run(run func(prefix string)) *MockEncoder_GenerateFilename_Call { _c.Call.Run(func(args mock.Arguments) { - run() + run(args[0].(string)) }) return _c } @@ -118,7 +119,7 @@ func (_c *MockEncoder_GenerateFilename_Call) Return(_a0 string) *MockEncoder_Gen return _c } -func (_c *MockEncoder_GenerateFilename_Call) RunAndReturn(run func() string) *MockEncoder_GenerateFilename_Call { +func (_c *MockEncoder_GenerateFilename_Call) RunAndReturn(run func(string) string) *MockEncoder_GenerateFilename_Call { _c.Call.Return(run) return _c } diff --git a/mocks/StreamingReader_mock.go b/mocks/StreamingReader_mock.go index 1d0c17f7..d24b1ddd 100644 --- a/mocks/StreamingReader_mock.go +++ b/mocks/StreamingReader_mock.go @@ -67,6 +67,42 @@ func (_c *MockStreamingReader_GetType_Call) RunAndReturn(run func() string) *Moc return _c } +// StreamFile provides a mock function with given fields: ctx, filename, readersCh, errorsCh +func (_m *MockStreamingReader) StreamFile(ctx context.Context, filename string, readersCh chan<- io.ReadCloser, errorsCh chan<- error) { + _m.Called(ctx, filename, readersCh, errorsCh) +} + +// MockStreamingReader_StreamFile_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StreamFile' +type MockStreamingReader_StreamFile_Call struct { + *mock.Call +} + +// StreamFile is a helper method to define mock.On call +// - ctx context.Context +// - filename string +// - readersCh chan<- io.ReadCloser +// - errorsCh chan<- error +func (_e *MockStreamingReader_Expecter) StreamFile(ctx interface{}, filename interface{}, readersCh interface{}, errorsCh interface{}) *MockStreamingReader_StreamFile_Call { + return &MockStreamingReader_StreamFile_Call{Call: _e.mock.On("StreamFile", ctx, filename, readersCh, errorsCh)} +} + +func (_c *MockStreamingReader_StreamFile_Call) Run(run func(ctx context.Context, filename string, readersCh chan<- io.ReadCloser, errorsCh chan<- error)) *MockStreamingReader_StreamFile_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(chan<- io.ReadCloser), args[3].(chan<- error)) + }) + return _c +} + +func (_c *MockStreamingReader_StreamFile_Call) Return() *MockStreamingReader_StreamFile_Call { + _c.Call.Return() + return _c +} + +func (_c *MockStreamingReader_StreamFile_Call) RunAndReturn(run func(context.Context, string, chan<- io.ReadCloser, chan<- error)) *MockStreamingReader_StreamFile_Call { + _c.Call.Return(run) + return _c +} + // StreamFiles provides a mock function with given fields: _a0, _a1, _a2 func (_m *MockStreamingReader) StreamFiles(_a0 context.Context, _a1 chan<- io.ReadCloser, _a2 chan<- error) { _m.Called(_a0, _a1, _a2) diff --git a/mocks/Writer_mock.go b/mocks/Writer_mock.go index d83e6ff4..1642431f 100644 --- a/mocks/Writer_mock.go +++ b/mocks/Writer_mock.go @@ -126,6 +126,52 @@ func (_c *MockWriter_NewWriter_Call) RunAndReturn(run func(context.Context, stri return _c } +// RemoveFiles provides a mock function with given fields: ctx +func (_m *MockWriter) RemoveFiles(ctx context.Context) error { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for RemoveFiles") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockWriter_RemoveFiles_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveFiles' +type MockWriter_RemoveFiles_Call struct { + *mock.Call +} + +// RemoveFiles is a helper method to define mock.On call +// - ctx context.Context +func (_e *MockWriter_Expecter) RemoveFiles(ctx interface{}) *MockWriter_RemoveFiles_Call { + return &MockWriter_RemoveFiles_Call{Call: _e.mock.On("RemoveFiles", ctx)} +} + +func (_c *MockWriter_RemoveFiles_Call) Run(run func(ctx context.Context)) *MockWriter_RemoveFiles_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *MockWriter_RemoveFiles_Call) Return(_a0 error) *MockWriter_RemoveFiles_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockWriter_RemoveFiles_Call) RunAndReturn(run func(context.Context) error) *MockWriter_RemoveFiles_Call { + _c.Call.Return(run) + return _c +} + // NewMockWriter creates a new instance of MockWriter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockWriter(t interface { diff --git a/models/data_models.go b/models/data_models.go index 500d9836..11f2959f 100644 --- a/models/data_models.go +++ b/models/data_models.go @@ -101,13 +101,12 @@ type Token struct { Record *Record Type TokenType Size uint64 - // Current filter state. Must copy this value. - Filter a.PartitionFilter - FileName string + // Current filter state. + Filter *PartitionFilterSerialized } // NewRecordToken creates a new token with the given record. -func NewRecordToken(r *Record, size uint64, filter a.PartitionFilter) *Token { +func NewRecordToken(r *Record, size uint64, filter *PartitionFilterSerialized) *Token { return &Token{ Record: r, Type: TokenTypeRecord, diff --git a/models/partition_filter_serialized.go b/models/partition_filter_serialized.go new file mode 100644 index 00000000..5a511a92 --- /dev/null +++ b/models/partition_filter_serialized.go @@ -0,0 +1,55 @@ +// Copyright 2024 Aerospike, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package models + +import ( + "fmt" + + a "github.com/aerospike/aerospike-client-go/v7" +) + +// PartitionFilterSerialized represent serialized a.PartitionFilter. +// To save cursor state. +type PartitionFilterSerialized struct { + Begin int + Count int + Digest []byte + Cursor []byte +} + +// NewPartitionFilterSerialized serialize *a.PartitionFilter and returns new PartitionFilterSerialized instance. +func NewPartitionFilterSerialized(pf *a.PartitionFilter) (*PartitionFilterSerialized, error) { + c, err := pf.EncodeCursor() + if err != nil { + return nil, fmt.Errorf("failed to encode cursor: %w", err) + } + + return &PartitionFilterSerialized{ + Begin: pf.Begin, + Count: pf.Count, + Digest: pf.Digest, + Cursor: c, + }, nil +} + +// Decode decodes *PartitionFilterSerialized to *a.PartitionFilter +func (p *PartitionFilterSerialized) Decode() (*a.PartitionFilter, error) { + pf := &a.PartitionFilter{Begin: p.Begin, Count: p.Count, Digest: p.Digest} + if err := pf.DecodeCursor(p.Cursor); err != nil { + return nil, fmt.Errorf("failed to decode cursor: %w", err) + } + + return pf, nil +} diff --git a/state.go b/state.go index 2352951f..19aa16d2 100644 --- a/state.go +++ b/state.go @@ -19,12 +19,12 @@ import ( "encoding/gob" "fmt" "io" - "log" "log/slog" "sync" "time" a "github.com/aerospike/aerospike-client-go/v7" + "github.com/aerospike/backup-go/models" ) // State contains current backups status data. @@ -36,9 +36,9 @@ type State struct { // Is used to create prefix for backup files. Counter int // RecordsChan communication channel to save current filter state. - RecordsChan chan recordState + RecordsChan chan *models.PartitionFilterSerialized // RecordStates store states of all filters. - RecordStates map[string]recordState + RecordStates map[string]*models.PartitionFilterSerialized // Mutex for RecordStates operations. // Ordinary mutex is used, because we must not allow any writings when we read state. mu sync.Mutex @@ -53,50 +53,6 @@ type State struct { logger *slog.Logger } -type recordState struct { - Filter filter -} - -// filter contains custom filter struct to save filter to GOB. -type filter struct { - Begin int - Count int - Digest []byte - Cursor []byte -} - -func mapToFilter(pf a.PartitionFilter) (filter, error) { - c, err := pf.EncodeCursor() - if err != nil { - return filter{}, fmt.Errorf("failed to encode cursor: %w", err) - } - return filter{ - Begin: pf.Begin, - Count: pf.Count, - Digest: pf.Digest, - Cursor: c, - }, nil -} - -func mapFromFilter(f filter) (*a.PartitionFilter, error) { - pf := &a.PartitionFilter{Begin: f.Begin, Count: f.Count, Digest: f.Digest} - if err := pf.DecodeCursor(f.Cursor); err != nil { - return nil, fmt.Errorf("failed to decode cursor: %w", err) - } - - return pf, nil -} - -func newRecordState(filter a.PartitionFilter) recordState { - f, err := mapToFilter(filter) - if err != nil { - log.Fatalf("failed to map partition filter: %w", err) - } - return recordState{ - Filter: f, - } -} - // NewState returns new state instance depending on config. // If we continue back up, the state will be loaded from a state file, // if it is the first operation, new state instance will be returned. @@ -116,11 +72,11 @@ func NewState( return nil, err } // change filters in config. - // TODO: may be move it handler, so everyone wil see it. config.PartitionFilters, err = s.loadPartitionFilters() if err != nil { return nil, err } + return s, nil } @@ -134,12 +90,11 @@ func newState( writer Writer, logger *slog.Logger, ) *State { - s := &State{ ctx: ctx, // RecordsChan must not be buffered, so we can stop all operations. - RecordsChan: make(chan recordState), - RecordStates: make(map[string]recordState), + RecordsChan: make(chan *models.PartitionFilterSerialized), + RecordStates: make(map[string]*models.PartitionFilterSerialized), FileName: config.StateFile, DumpDuration: config.StateFileDumpDuration, writer: writer, @@ -175,7 +130,7 @@ func newStateFromFile( s.ctx = ctx s.writer = writer s.logger = logger - s.RecordsChan = make(chan recordState) + s.RecordsChan = make(chan *models.PartitionFilterSerialized) s.Counter++ logger.Debug("loaded state file successfully") @@ -228,6 +183,7 @@ func (s *State) dump() error { } enc := gob.NewEncoder(file) + s.mu.Lock() if err = enc.Encode(s); err != nil { return fmt.Errorf("failed to encode state data: %w", err) @@ -244,8 +200,9 @@ func (s *State) loadPartitionFilters() ([]*a.PartitionFilter, error) { s.mu.Lock() result := make([]*a.PartitionFilter, 0, len(s.RecordStates)) + for _, state := range s.RecordStates { - f, err := mapFromFilter(state.Filter) + f, err := state.Decode() if err != nil { return nil, err } @@ -259,26 +216,23 @@ func (s *State) loadPartitionFilters() ([]*a.PartitionFilter, error) { } func (s *State) serveRecords() { - var counter int + for { select { case <-s.ctx.Done(): return case state := <-s.RecordsChan: + if state == nil { + continue + } + counter++ + s.mu.Lock() - key := fmt.Sprintf("%d%d%s", state.Filter.Begin, state.Filter.Count, state.Filter.Digest) + key := fmt.Sprintf("%d%d%s", state.Begin, state.Count, state.Digest) s.RecordStates[key] = state s.mu.Unlock() - - // For tests: - // ---------- - // if counter == 1000 { - // s.dump() - // fmt.Println("done 4000000") - // os.Exit(1) - // } } } } @@ -294,7 +248,9 @@ func (s *State) getFileSuffix() string { func openFile(ctx context.Context, reader StreamingReader, fileName string) (io.ReadCloser, error) { readCh := make(chan io.ReadCloser) errCh := make(chan error) + go reader.StreamFile(ctx, fileName, readCh, errCh) + for { select { case <-ctx.Done(): diff --git a/state_test.go b/state_test.go index 086c5411..51ee55a2 100644 --- a/state_test.go +++ b/state_test.go @@ -13,56 +13,3 @@ // limitations under the License. package backup - -import ( - "context" - "log/slog" - "os" - "path/filepath" - "testing" - "time" - - a "github.com/aerospike/aerospike-client-go/v7" - "github.com/stretchr/testify/require" -) - -const ( - testDuration = 1 * time.Second -) - -func TestState(t *testing.T) { - t.Parallel() - - ctx, cancel := context.WithCancel(context.Background()) - tempFile := filepath.Join(t.TempDir(), "state_test.gob") - pfs := []*a.PartitionFilter{ - NewPartitionFilterByID(1), - NewPartitionFilterByID(2), - } - logger := slog.New(slog.NewTextHandler(nil, nil)) - - // Check init. - state := NewState(ctx, tempFile, testDuration, pfs, logger) - - time.Sleep(testDuration * 3) - - require.NotZero(t, state.SavedAt) - cancel() - - // Check that file exists. - _, err := os.Stat(tempFile) - require.NoError(t, err) - - // Nullify the link. - pfs = nil - result := []*a.PartitionFilter{ - NewPartitionFilterByID(1), - NewPartitionFilterByID(2), - } - - // Check restore. - newCtx := context.Background() - newState, err := NewStateFromFile(newCtx, tempFile, logger) - require.NoError(t, err) - require.Equal(t, newState.PartitionFilters, result) -} diff --git a/tests/integration/integration_test.go b/tests/integration/integration_test.go index d87b41a6..5934e391 100644 --- a/tests/integration/integration_test.go +++ b/tests/integration/integration_test.go @@ -238,6 +238,7 @@ func runBackupRestore(suite *backupRestoreTestSuite, backupConfig *backup.Backup ctx, backupConfig, &dst, + nil, ) suite.Nil(err) suite.NotNil(bh) @@ -384,6 +385,7 @@ func runBackupRestoreDirectory(suite *backupRestoreTestSuite, ctx, backupConfig, writers, + nil, ) suite.Nil(err) suite.NotNil(bh) @@ -463,7 +465,7 @@ func (suite *backupRestoreTestSuite) TestRestoreExpiredRecords() { VoidTime: 1, } - token := models.NewRecordToken(modelRec, 0) + token := models.NewRecordToken(modelRec, 0, nil) v, err := encoder.EncodeToken(token) if err != nil { suite.FailNow(err.Error()) @@ -556,6 +558,7 @@ func (suite *backupRestoreTestSuite) TestBackupRestoreIOWithPartitions() { ctx, backupConfig, writers, + nil, ) suite.Nil(err) suite.NotNil(bh) @@ -595,6 +598,7 @@ func (suite *backupRestoreTestSuite) TestBackupContext() { ctx, backup.NewDefaultBackupConfig(), &writer, + nil, ) suite.NotNil(bh) suite.Nil(err) @@ -746,6 +750,7 @@ func (suite *backupRestoreTestSuite) TestBackupParallelNodes() { ctx, bCfg, &dst, + nil, ) suite.NotNil(bh) suite.Nil(err) @@ -763,6 +768,7 @@ func (suite *backupRestoreTestSuite) TestBackupParallelNodesList() { ctx, bCfg, &dst, + nil, ) suite.NotNil(bh) suite.Nil(err) @@ -792,6 +798,7 @@ func (suite *backupRestoreTestSuite) TestBackupPartitionList() { ctx, bCfg, &dst, + nil, ) suite.NotNil(bh) suite.Nil(err) @@ -1081,6 +1088,7 @@ func (suite *backupRestoreTestSuite) TestBackupAfterDigestOk() { ctx, backupConfig, &dst, + nil, ) suite.Nil(err) suite.NotNil(bh) @@ -1113,6 +1121,12 @@ func (b *byteReadWriterFactory) StreamFiles(_ context.Context, readersCh chan<- close(readersCh) } +func (b *byteReadWriterFactory) StreamFile(_ context.Context, _ string, readersCh chan<- io.ReadCloser, _ chan<- error) { + reader := io.NopCloser(bytes.NewReader(b.buffer.Bytes())) + readersCh <- reader + close(readersCh) +} + func (b *byteReadWriterFactory) OpenFile(_ context.Context, _ string, readersCh chan<- io.ReadCloser, _ chan<- error) { reader := io.NopCloser(bytes.NewReader(b.buffer.Bytes())) readersCh <- reader diff --git a/writers.go b/writers.go index 9d51fc6b..c8fd7f89 100644 --- a/writers.go +++ b/writers.go @@ -88,11 +88,16 @@ type tokenWriter struct { encoder Encoder output io.Writer logger *slog.Logger - stateChan chan<- recordState + stateChan chan<- *models.PartitionFilterSerialized } // newTokenWriter creates a new tokenWriter. -func newTokenWriter(encoder Encoder, output io.Writer, logger *slog.Logger, stateChan chan<- recordState) *tokenWriter { +func newTokenWriter( + encoder Encoder, + output io.Writer, + logger *slog.Logger, + stateChan chan<- *models.PartitionFilterSerialized, +) *tokenWriter { id := uuid.NewString() logger = logging.WithWriter(logger, id, logging.WriterTypeToken) logger.Debug("created new token writer") @@ -112,7 +117,7 @@ func (w *tokenWriter) Write(v *models.Token) (int, error) { return 0, fmt.Errorf("error encoding token: %w", err) } - w.stateChan <- newRecordState(v.Filter) + w.stateChan <- v.Filter return w.output.Write(data) } diff --git a/writers_test.go b/writers_test.go index 4dcce0b3..4d78ef23 100644 --- a/writers_test.go +++ b/writers_test.go @@ -49,7 +49,7 @@ func (suite *writersTestSuite) TestTokenWriter() { }, }, } - recToken := models.NewRecordToken(expRecord, 0) + recToken := models.NewRecordToken(expRecord, 0, nil) expUDF := &models.UDF{ Name: "udf", @@ -70,7 +70,7 @@ func (suite *writersTestSuite) TestTokenWriter() { mockEncoder.EXPECT().EncodeToken(invalidToken).Return(nil, errors.New("error")) dst := bytes.Buffer{} - writer := newTokenWriter(mockEncoder, &dst, slog.Default()) + writer := newTokenWriter(mockEncoder, &dst, slog.Default(), nil) suite.NotNil(writer) _, err := writer.Write(recToken) @@ -92,7 +92,7 @@ func (suite *writersTestSuite) TestTokenWriter() { failRec := &models.Record{ Record: &a.Record{}, } - failRecToken := models.NewRecordToken(failRec, 0) + failRecToken := models.NewRecordToken(failRec, 0, nil) mockEncoder.EXPECT().EncodeToken(failRecToken).Return(nil, errors.New("error")) _, err = writer.Write(failRecToken) suite.NotNil(err) @@ -103,7 +103,7 @@ func (suite *writersTestSuite) TestTokenWriter() { func (suite *writersTestSuite) TestTokenStatsWriter() { mockWriter := pipemocks.NewMockDataWriter[*models.Token](suite.T()) - mockWriter.EXPECT().Write(models.NewRecordToken(&models.Record{}, 0)).Return(1, nil) + mockWriter.EXPECT().Write(models.NewRecordToken(&models.Record{}, 0, nil)).Return(1, nil) mockWriter.EXPECT().Write(models.NewSIndexToken(&models.SIndex{}, 0)).Return(1, nil) mockWriter.EXPECT().Write(models.NewUDFToken(&models.UDF{}, 0)).Return(1, nil) mockWriter.EXPECT().Write(&models.Token{Type: models.TokenTypeInvalid}).Return(0, errors.New("error")) @@ -116,7 +116,7 @@ func (suite *writersTestSuite) TestTokenStatsWriter() { writer := newWriterWithTokenStats(mockWriter, mockStats, slog.Default()) suite.NotNil(writer) - _, err := writer.Write(models.NewRecordToken(&models.Record{}, 0)) + _, err := writer.Write(models.NewRecordToken(&models.Record{}, 0, nil)) suite.Nil(err) _, err = writer.Write(models.NewSIndexToken(&models.SIndex{}, 0))