Skip to content

Commit

Permalink
FMWK-570-backup-restore-state
Browse files Browse the repository at this point in the history
- wip on state support
  • Loading branch information
filkeith committed Oct 7, 2024
1 parent 8ce777b commit 7026e71
Show file tree
Hide file tree
Showing 6 changed files with 314 additions and 5 deletions.
8 changes: 4 additions & 4 deletions config_partition_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,21 @@ func NewPartitionFilterAll() *a.PartitionFilter {
// splitPartitions splits partition to groups.
func splitPartitions(partitionFilters []*a.PartitionFilter, numWorkers int) ([]*a.PartitionFilter, error) {
if numWorkers < 1 || numWorkers < len(partitionFilters) {
return nil, fmt.Errorf("numWorkers is less than partitionFilters, cannot split partitionFilters")
return nil, fmt.Errorf("numWorkers is less than PartitionFilters, cannot split PartitionFilters")
}

// Validations.
for i := range partitionFilters {
if partitionFilters[i].Begin < 0 {
return nil, fmt.Errorf("startPartition is less than 0, cannot split partitionFilters")
return nil, fmt.Errorf("startPartition is less than 0, cannot split PartitionFilters")
}

if partitionFilters[i].Count < 1 {
return nil, fmt.Errorf("numPartitions is less than 1, cannot split partitionFilters")
return nil, fmt.Errorf("numPartitions is less than 1, cannot split PartitionFilters")
}

if partitionFilters[i].Begin+partitionFilters[i].Count > MaxPartitions {
return nil, fmt.Errorf("startPartition + numPartitions is greater than the max partitionFilters: %d",
return nil, fmt.Errorf("startPartition + numPartitions is greater than the max PartitionFilters: %d",
MaxPartitions)
}
}
Expand Down
2 changes: 1 addition & 1 deletion config_partition_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func TestSplitPartitions_NumWorkersLessThanFilters(t *testing.T) {
_, err := splitPartitions(partitionFilters, numWorkers)

assert.Error(t, err)
assert.Equal(t, "numWorkers is less than partitionFilters, cannot split partitionFilters", err.Error())
assert.Equal(t, "numWorkers is less than PartitionFilters, cannot split PartitionFilters", err.Error())
}

func TestSplitPartitionRange(t *testing.T) {
Expand Down
64 changes: 64 additions & 0 deletions config_policy_compression.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2024 Aerospike, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package backup

import (
"fmt"
)

// Compression modes
const (
// CompressNone no compression.
CompressNone = "NONE"

Check failure on line 24 in config_policy_compression.go

View workflow job for this annotation

GitHub Actions / tests

other declaration of CompressNone

Check failure on line 24 in config_policy_compression.go

View workflow job for this annotation

GitHub Actions / lint

other declaration of CompressNone

Check failure on line 24 in config_policy_compression.go

View workflow job for this annotation

GitHub Actions / lint

other declaration of CompressNone

Check failure on line 24 in config_policy_compression.go

View workflow job for this annotation

GitHub Actions / tests

other declaration of CompressNone
// CompressZSTD compression using ZSTD.
CompressZSTD = "ZSTD"

Check failure on line 26 in config_policy_compression.go

View workflow job for this annotation

GitHub Actions / tests

other declaration of CompressZSTD

Check failure on line 26 in config_policy_compression.go

View workflow job for this annotation

GitHub Actions / lint

other declaration of CompressZSTD

Check failure on line 26 in config_policy_compression.go

View workflow job for this annotation

GitHub Actions / lint

other declaration of CompressZSTD

Check failure on line 26 in config_policy_compression.go

View workflow job for this annotation

GitHub Actions / tests

other declaration of CompressZSTD
)

// CompressionPolicy contains backup compression information.
type CompressionPolicy struct {

Check failure on line 30 in config_policy_compression.go

View workflow job for this annotation

GitHub Actions / tests

other declaration of CompressionPolicy

Check failure on line 30 in config_policy_compression.go

View workflow job for this annotation

GitHub Actions / lint

other declaration of CompressionPolicy

Check failure on line 30 in config_policy_compression.go

View workflow job for this annotation

GitHub Actions / lint

other declaration of CompressionPolicy

Check failure on line 30 in config_policy_compression.go

View workflow job for this annotation

GitHub Actions / tests

other declaration of CompressionPolicy
// The compression mode to be used (default is NONE).
Mode string `yaml:"mode,omitempty" json:"mode,omitempty" default:"NONE" enums:"NONE,ZSTD"`
// The compression level to use (or -1 if unspecified).
Level int `yaml:"level,omitempty" json:"level,omitempty"`
}

// NewCompressionPolicy returns new compression policy for backup/restore operations.
func NewCompressionPolicy(mode string, level int) *CompressionPolicy {

Check failure on line 38 in config_policy_compression.go

View workflow job for this annotation

GitHub Actions / tests

other declaration of NewCompressionPolicy

Check failure on line 38 in config_policy_compression.go

View workflow job for this annotation

GitHub Actions / lint

other declaration of NewCompressionPolicy

Check failure on line 38 in config_policy_compression.go

View workflow job for this annotation

GitHub Actions / lint

other declaration of NewCompressionPolicy

Check failure on line 38 in config_policy_compression.go

View workflow job for this annotation

GitHub Actions / tests

other declaration of NewCompressionPolicy
return &CompressionPolicy{
Mode: mode,
Level: level,
}
}

// validate validates the compression policy parameters.
func (p *CompressionPolicy) validate() error {
if p == nil {
return nil
}

if p.Mode != CompressNone && p.Mode != CompressZSTD {
return fmt.Errorf("invalid compression mode: %s", p.Mode)
}

if p.Level == 0 {
p.Level = -1
}

if p.Level < -1 {
return fmt.Errorf("invalid compression level: %d", p.Level)
}

return nil
}
66 changes: 66 additions & 0 deletions config_policy_encryption.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2024 Aerospike, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package backup

import (
"errors"
"fmt"
)

// Encryption modes
const (
// EncryptNone no encryption.
EncryptNone = "NONE"
// EncryptAES128 encryption using AES128 algorithm.
EncryptAES128 = "AES128"
// EncryptAES256 encryption using AES256 algorithm.
EncryptAES256 = "AES256"
)

// EncryptionPolicy contains backup encryption information.
type EncryptionPolicy struct {
// The path to the file containing the encryption key.
KeyFile *string `yaml:"key-file,omitempty" json:"key-file,omitempty"`
// The name of the environment variable containing the encryption key.
KeyEnv *string `yaml:"key-env,omitempty" json:"key-env,omitempty"`
// The secret keyword in Aerospike Secret Agent containing the encryption key.
KeySecret *string `yaml:"key-secret,omitempty" json:"key-secret,omitempty"`
// The encryption mode to be used (NONE, AES128, AES256)
Mode string `yaml:"mode,omitempty" json:"mode,omitempty" default:"NONE" enums:"NONE,AES128,AES256"`
}

// validate validates the encryption policy.
func (p *EncryptionPolicy) validate() error {
if p == nil {
return nil
}

if p.Mode != EncryptNone && p.Mode != EncryptAES128 && p.Mode != EncryptAES256 {
return fmt.Errorf("invalid encryption mode: %s", p.Mode)
}

if p.KeyFile == nil && p.KeyEnv == nil && p.KeySecret == nil {
return errors.New("encryption key location not specified")
}

// Only one parameter allowed to be set.
if (p.KeyFile != nil && p.KeyEnv != nil) ||
(p.KeyFile != nil && p.KeySecret != nil) ||
(p.KeyEnv != nil && p.KeySecret != nil) {
return fmt.Errorf("only one encryption key source may be specified")
}

return nil
}
132 changes: 132 additions & 0 deletions state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package backup

import (
"context"
"encoding/gob"
"fmt"
"log/slog"
"os"
"time"

a "github.com/aerospike/aerospike-client-go/v7"
)

// State contains current backups status data.
type State struct {
// Global backup context.
ctx context.Context

// File to save to.
fileName string

// How often file will be saved to disk.
dumpTimeout time.Duration

// List of applied partition filters
PartitionFilters []*a.PartitionFilter

// Save files cursor.
// TODO: think how to map it to filters.

// timestamp of last dump to file.
SavedAt time.Time

// logger for logging errors.
logger *slog.Logger
}

// NewState creates status service from parameters, for backup operations.
func NewState(
ctx context.Context,
fileName string,
dumpTimeout time.Duration,
partitionFilters []*a.PartitionFilter,
logger *slog.Logger,
) *State {
s := &State{
ctx: ctx,
fileName: fileName,
dumpTimeout: dumpTimeout,
PartitionFilters: partitionFilters,
logger: logger,
}
// Run watcher on initialization.
go s.serve()

return s
}

// NewStateFromFile creates a status service from the file, for restore operations.
func NewStateFromFile(ctx context.Context, fileName string, logger *slog.Logger) (*State, error) {
// TODO: replace with io reader/writer.
reader, err := os.Open(fileName)
if err != nil {
return nil, fmt.Errorf("failed to open state file: %w", err)
}

dec := gob.NewDecoder(reader)

var state State
if err = dec.Decode(&state); err != nil {
return nil, fmt.Errorf("failed to decode state: %w", err)
}

state.ctx = ctx
state.logger = logger

return &state, nil
}

// serve dumps files to disk.
func (s *State) serve() {
ticker := time.NewTicker(s.dumpTimeout)
defer ticker.Stop()

// Dump a file at the very beginning.
if err := s.dump(); err != nil {
s.logger.Error("failed to dump state", slog.Any("error", err))
return
}

// Server ticker.
for {
select {
case <-s.ctx.Done():
// saves state and exit
if err := s.dump(); err != nil {
s.logger.Error("failed to dump state", slog.Any("error", err))
return
}

return
case <-ticker.C:
// save state and sleep.
time.Sleep(time.Second)
// save intermediate state.
if err := s.dump(); err != nil {
s.logger.Error("failed to dump state", slog.Any("error", err))
return
}

s.SavedAt = time.Now()
}
}
}

func (s *State) dump() error {
// TODO: replace with io reader/writer.
file, err := os.OpenFile(s.fileName, os.O_CREATE|os.O_WRONLY, 0o666)
if err != nil {
return fmt.Errorf("failed to create state file %s: %w", s.fileName, err)
}

enc := gob.NewEncoder(file)

// TODO: check if we must create copies from PartitionFilters.

if err = enc.Encode(s); err != nil {
return fmt.Errorf("failed to encode state data: %w", err)
}

return nil
}
47 changes: 47 additions & 0 deletions state_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package backup

import (
"context"
"log/slog"
"os"
"path/filepath"
"testing"
"time"

a "github.com/aerospike/aerospike-client-go/v7"
"github.com/stretchr/testify/require"
)

const (
testDuration = 1 * time.Second
)

func TestState(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
tempFile := filepath.Join(t.TempDir(), "state_test.gob")
pfs := []*a.PartitionFilter{
NewPartitionFilterByID(1),
NewPartitionFilterByID(2),
}
logger := slog.New(slog.NewTextHandler(nil, nil))

// Check init.
state := NewState(ctx, tempFile, testDuration, pfs, logger)

time.Sleep(testDuration * 3)

require.NotZero(t, state.SavedAt)
cancel()

// Check that file exists.
_, err := os.Stat(tempFile)
require.NoError(t, err)

// Check restore.
newCtx := context.Background()
newState, err := NewStateFromFile(newCtx, tempFile, logger)
require.NoError(t, err)
require.Equal(t, newState.PartitionFilters, pfs)
}

0 comments on commit 7026e71

Please sign in to comment.