From 7026e71c84655211645e5be3497ab46177282203 Mon Sep 17 00:00:00 2001 From: Dmitrii Neeman Date: Mon, 7 Oct 2024 17:46:22 +0300 Subject: [PATCH] FMWK-570-backup-restore-state - wip on state support --- config_partition_filter.go | 8 +- config_partition_filter_test.go | 2 +- config_policy_compression.go | 64 ++++++++++++++++ config_policy_encryption.go | 66 ++++++++++++++++ state.go | 132 ++++++++++++++++++++++++++++++++ state_test.go | 47 ++++++++++++ 6 files changed, 314 insertions(+), 5 deletions(-) create mode 100644 config_policy_compression.go create mode 100644 config_policy_encryption.go create mode 100644 state.go create mode 100644 state_test.go diff --git a/config_partition_filter.go b/config_partition_filter.go index 0def8768..a62d3a16 100644 --- a/config_partition_filter.go +++ b/config_partition_filter.go @@ -66,21 +66,21 @@ func NewPartitionFilterAll() *a.PartitionFilter { // splitPartitions splits partition to groups. func splitPartitions(partitionFilters []*a.PartitionFilter, numWorkers int) ([]*a.PartitionFilter, error) { if numWorkers < 1 || numWorkers < len(partitionFilters) { - return nil, fmt.Errorf("numWorkers is less than partitionFilters, cannot split partitionFilters") + return nil, fmt.Errorf("numWorkers is less than PartitionFilters, cannot split PartitionFilters") } // Validations. for i := range partitionFilters { if partitionFilters[i].Begin < 0 { - return nil, fmt.Errorf("startPartition is less than 0, cannot split partitionFilters") + return nil, fmt.Errorf("startPartition is less than 0, cannot split PartitionFilters") } if partitionFilters[i].Count < 1 { - return nil, fmt.Errorf("numPartitions is less than 1, cannot split partitionFilters") + return nil, fmt.Errorf("numPartitions is less than 1, cannot split PartitionFilters") } if partitionFilters[i].Begin+partitionFilters[i].Count > MaxPartitions { - return nil, fmt.Errorf("startPartition + numPartitions is greater than the max partitionFilters: %d", + return nil, fmt.Errorf("startPartition + numPartitions is greater than the max PartitionFilters: %d", MaxPartitions) } } diff --git a/config_partition_filter_test.go b/config_partition_filter_test.go index 587859fb..7e8eab1a 100644 --- a/config_partition_filter_test.go +++ b/config_partition_filter_test.go @@ -129,7 +129,7 @@ func TestSplitPartitions_NumWorkersLessThanFilters(t *testing.T) { _, err := splitPartitions(partitionFilters, numWorkers) assert.Error(t, err) - assert.Equal(t, "numWorkers is less than partitionFilters, cannot split partitionFilters", err.Error()) + assert.Equal(t, "numWorkers is less than PartitionFilters, cannot split PartitionFilters", err.Error()) } func TestSplitPartitionRange(t *testing.T) { diff --git a/config_policy_compression.go b/config_policy_compression.go new file mode 100644 index 00000000..f93d967c --- /dev/null +++ b/config_policy_compression.go @@ -0,0 +1,64 @@ +// Copyright 2024 Aerospike, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package backup + +import ( + "fmt" +) + +// Compression modes +const ( + // CompressNone no compression. + CompressNone = "NONE" + // CompressZSTD compression using ZSTD. + CompressZSTD = "ZSTD" +) + +// CompressionPolicy contains backup compression information. +type CompressionPolicy struct { + // The compression mode to be used (default is NONE). + Mode string `yaml:"mode,omitempty" json:"mode,omitempty" default:"NONE" enums:"NONE,ZSTD"` + // The compression level to use (or -1 if unspecified). + Level int `yaml:"level,omitempty" json:"level,omitempty"` +} + +// NewCompressionPolicy returns new compression policy for backup/restore operations. +func NewCompressionPolicy(mode string, level int) *CompressionPolicy { + return &CompressionPolicy{ + Mode: mode, + Level: level, + } +} + +// validate validates the compression policy parameters. +func (p *CompressionPolicy) validate() error { + if p == nil { + return nil + } + + if p.Mode != CompressNone && p.Mode != CompressZSTD { + return fmt.Errorf("invalid compression mode: %s", p.Mode) + } + + if p.Level == 0 { + p.Level = -1 + } + + if p.Level < -1 { + return fmt.Errorf("invalid compression level: %d", p.Level) + } + + return nil +} diff --git a/config_policy_encryption.go b/config_policy_encryption.go new file mode 100644 index 00000000..f367e6a3 --- /dev/null +++ b/config_policy_encryption.go @@ -0,0 +1,66 @@ +// Copyright 2024 Aerospike, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package backup + +import ( + "errors" + "fmt" +) + +// Encryption modes +const ( + // EncryptNone no encryption. + EncryptNone = "NONE" + // EncryptAES128 encryption using AES128 algorithm. + EncryptAES128 = "AES128" + // EncryptAES256 encryption using AES256 algorithm. + EncryptAES256 = "AES256" +) + +// EncryptionPolicy contains backup encryption information. +type EncryptionPolicy struct { + // The path to the file containing the encryption key. + KeyFile *string `yaml:"key-file,omitempty" json:"key-file,omitempty"` + // The name of the environment variable containing the encryption key. + KeyEnv *string `yaml:"key-env,omitempty" json:"key-env,omitempty"` + // The secret keyword in Aerospike Secret Agent containing the encryption key. + KeySecret *string `yaml:"key-secret,omitempty" json:"key-secret,omitempty"` + // The encryption mode to be used (NONE, AES128, AES256) + Mode string `yaml:"mode,omitempty" json:"mode,omitempty" default:"NONE" enums:"NONE,AES128,AES256"` +} + +// validate validates the encryption policy. +func (p *EncryptionPolicy) validate() error { + if p == nil { + return nil + } + + if p.Mode != EncryptNone && p.Mode != EncryptAES128 && p.Mode != EncryptAES256 { + return fmt.Errorf("invalid encryption mode: %s", p.Mode) + } + + if p.KeyFile == nil && p.KeyEnv == nil && p.KeySecret == nil { + return errors.New("encryption key location not specified") + } + + // Only one parameter allowed to be set. + if (p.KeyFile != nil && p.KeyEnv != nil) || + (p.KeyFile != nil && p.KeySecret != nil) || + (p.KeyEnv != nil && p.KeySecret != nil) { + return fmt.Errorf("only one encryption key source may be specified") + } + + return nil +} diff --git a/state.go b/state.go new file mode 100644 index 00000000..4ef50807 --- /dev/null +++ b/state.go @@ -0,0 +1,132 @@ +package backup + +import ( + "context" + "encoding/gob" + "fmt" + "log/slog" + "os" + "time" + + a "github.com/aerospike/aerospike-client-go/v7" +) + +// State contains current backups status data. +type State struct { + // Global backup context. + ctx context.Context + + // File to save to. + fileName string + + // How often file will be saved to disk. + dumpTimeout time.Duration + + // List of applied partition filters + PartitionFilters []*a.PartitionFilter + + // Save files cursor. + // TODO: think how to map it to filters. + + // timestamp of last dump to file. + SavedAt time.Time + + // logger for logging errors. + logger *slog.Logger +} + +// NewState creates status service from parameters, for backup operations. +func NewState( + ctx context.Context, + fileName string, + dumpTimeout time.Duration, + partitionFilters []*a.PartitionFilter, + logger *slog.Logger, +) *State { + s := &State{ + ctx: ctx, + fileName: fileName, + dumpTimeout: dumpTimeout, + PartitionFilters: partitionFilters, + logger: logger, + } + // Run watcher on initialization. + go s.serve() + + return s +} + +// NewStateFromFile creates a status service from the file, for restore operations. +func NewStateFromFile(ctx context.Context, fileName string, logger *slog.Logger) (*State, error) { + // TODO: replace with io reader/writer. + reader, err := os.Open(fileName) + if err != nil { + return nil, fmt.Errorf("failed to open state file: %w", err) + } + + dec := gob.NewDecoder(reader) + + var state State + if err = dec.Decode(&state); err != nil { + return nil, fmt.Errorf("failed to decode state: %w", err) + } + + state.ctx = ctx + state.logger = logger + + return &state, nil +} + +// serve dumps files to disk. +func (s *State) serve() { + ticker := time.NewTicker(s.dumpTimeout) + defer ticker.Stop() + + // Dump a file at the very beginning. + if err := s.dump(); err != nil { + s.logger.Error("failed to dump state", slog.Any("error", err)) + return + } + + // Server ticker. + for { + select { + case <-s.ctx.Done(): + // saves state and exit + if err := s.dump(); err != nil { + s.logger.Error("failed to dump state", slog.Any("error", err)) + return + } + + return + case <-ticker.C: + // save state and sleep. + time.Sleep(time.Second) + // save intermediate state. + if err := s.dump(); err != nil { + s.logger.Error("failed to dump state", slog.Any("error", err)) + return + } + + s.SavedAt = time.Now() + } + } +} + +func (s *State) dump() error { + // TODO: replace with io reader/writer. + file, err := os.OpenFile(s.fileName, os.O_CREATE|os.O_WRONLY, 0o666) + if err != nil { + return fmt.Errorf("failed to create state file %s: %w", s.fileName, err) + } + + enc := gob.NewEncoder(file) + + // TODO: check if we must create copies from PartitionFilters. + + if err = enc.Encode(s); err != nil { + return fmt.Errorf("failed to encode state data: %w", err) + } + + return nil +} diff --git a/state_test.go b/state_test.go new file mode 100644 index 00000000..07d91ea4 --- /dev/null +++ b/state_test.go @@ -0,0 +1,47 @@ +package backup + +import ( + "context" + "log/slog" + "os" + "path/filepath" + "testing" + "time" + + a "github.com/aerospike/aerospike-client-go/v7" + "github.com/stretchr/testify/require" +) + +const ( + testDuration = 1 * time.Second +) + +func TestState(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + tempFile := filepath.Join(t.TempDir(), "state_test.gob") + pfs := []*a.PartitionFilter{ + NewPartitionFilterByID(1), + NewPartitionFilterByID(2), + } + logger := slog.New(slog.NewTextHandler(nil, nil)) + + // Check init. + state := NewState(ctx, tempFile, testDuration, pfs, logger) + + time.Sleep(testDuration * 3) + + require.NotZero(t, state.SavedAt) + cancel() + + // Check that file exists. + _, err := os.Stat(tempFile) + require.NoError(t, err) + + // Check restore. + newCtx := context.Background() + newState, err := NewStateFromFile(newCtx, tempFile, logger) + require.NoError(t, err) + require.Equal(t, newState.PartitionFilters, pfs) +}