Skip to content

Commit

Permalink
Make periodic pruning configurable.
Browse files Browse the repository at this point in the history
  • Loading branch information
mcdee committed Jan 18, 2024
1 parent 4ec3efb commit c9bcbdb
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 34 deletions.
2 changes: 2 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ server:
rules:
# admin-ips is a list of IP addresses from which requests for voluntary exists will be accepted.
admin-ips: [ 1.2.3.4, 5.6.7.8 ]
# periodic-pruning will prune the rules storage if enabled every 1-2 days to keep the database size down.
periodic-pruning: true
certificates:
# server-cert is the majordomo URL to the server's certificate.
server-cert: file:///home/me/dirk/security/certificates/myserver.example.com.crt
Expand Down
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ func initRules(ctx context.Context) (rules.Service, error) {
standardrules.WithLogLevel(util.LogLevel("rules")),
standardrules.WithStoragePath(util.ResolvePath(viper.GetString("storage-path"))),
standardrules.WithAdminIPs(viper.GetStringSlice("server.rules.admin-ips")),
standardrules.WithPeriodicPruning(viper.GetBool("server.rules.periodic-pruning")),
)
}

Expand Down
14 changes: 11 additions & 3 deletions rules/standard/parameters.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ import (
)

type parameters struct {
logLevel zerolog.Level
storagePath string
adminIPs []string
logLevel zerolog.Level
storagePath string
adminIPs []string
periodicPruning bool
}

// Parameter is the interface for service parameters.
Expand Down Expand Up @@ -57,6 +58,13 @@ func WithAdminIPs(adminIPs []string) Parameter {
})
}

// WithPeriodicPruning enables periodic pruning of the rules database.
func WithPeriodicPruning(periodicPruning bool) Parameter {
return parameterFunc(func(p *parameters) {
p.periodicPruning = periodicPruning
})
}

// parseAndCheckParameters parses and checks parameters to ensure that mandatory parameters are present and correct.
func parseAndCheckParameters(params ...Parameter) (*parameters, error) {
parameters := parameters{
Expand Down
6 changes: 4 additions & 2 deletions rules/standard/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func New(ctx context.Context, params ...Parameter) (*Service, error) {
log = log.Level(parameters.logLevel)
}

store, err := NewStore(ctx, parameters.storagePath)
store, err := NewStore(ctx, parameters.storagePath, parameters.periodicPruning)
if err != nil {
return nil, err
}
Expand All @@ -68,7 +68,9 @@ func New(ctx context.Context, params ...Parameter) (*Service, error) {

// Close closes the database for the persistent rules information.
func (s *Service) Close(ctx context.Context) error {
s.store.gcTicker.Stop()
if s.store.gcTicker != nil {
s.store.gcTicker.Stop()
}
return s.store.Close(ctx)
}

Expand Down
58 changes: 35 additions & 23 deletions rules/standard/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ package standard

import (
"context"
"math/rand"
"crypto/rand"
"math/big"
"time"

"github.com/attestantio/dirk/util/loggers"
Expand All @@ -32,7 +33,10 @@ type Store struct {
}

// NewStore creates a new badger store.
func NewStore(ctx context.Context, base string) (*Store, error) {
func NewStore(ctx context.Context,
base string,
periodicPruning bool,
) (*Store, error) {
opt := badger.DefaultOptions(base)
opt.TableLoadingMode = options.LoadToRAM
opt.ValueLogLoadingMode = options.MemoryMap
Expand All @@ -43,30 +47,38 @@ func NewStore(ctx context.Context, base string) (*Store, error) {
return nil, err
}

// Garbage collect every day or two.
// Use a random offset to avoid multiple Dirk instances started simultaneously
// running this procedure at the same time.
//nolint:gosec
period := 24*time.Hour + time.Duration(int(rand.Int31()%60*24))*time.Minute
ticker := time.NewTicker(period)
go func(db *badger.DB) {
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
for {
log.Info().Msg("Running storage garbage collection")
if err = db.RunValueLogGC(0.7); err != nil {
// Error occurs when there is nothing left to collect.
log.Info().Msg("Completed storage garbage collection")
break
var ticker *time.Ticker
if periodicPruning {
// Garbage collect every day or two.
// Use a random offset to avoid multiple Dirk instances started simultaneously
// running this procedure at the same time.
offset, err := rand.Int(rand.Reader, big.NewInt(86400))
if err != nil {
return nil, errors.Wrap(err, "random number generator failure")
}
period := 24*time.Hour + time.Duration(time.Duration(offset.Int64())*time.Second)

Check failure on line 59 in rules/standard/storage.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary conversion (unconvert)
ticker = time.NewTicker(period)
go func(db *badger.DB) {
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
for {
log.Trace().Msg("Running storage garbage collection")
if err = db.RunValueLogGC(0.7); err != nil {
// Error occurs when there is nothing left to collect.
log.Trace().Msg("Completed storage garbage collection")
break
}
}
}
}
}
}(db)
log.Info().Stringer("period", period).Msg("Storage garbage collection routine scheduled")
}(db)
log.Info().Stringer("period", period).Msg("Storage garbage collection routine scheduled")
} else {
log.Info().Msg("Storage garbage collection routine not scheduled")
}

return &Store{
db: db,
Expand Down
12 changes: 6 additions & 6 deletions rules/standard/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ import (
func TestService(t *testing.T) {
ctx := context.Background()

_, err := standardrules.NewStore(ctx, "/does/not/exist")
_, err := standardrules.NewStore(ctx, "/does/not/exist", false)
assert.Contains(t, err.Error(), "Error Creating Dir")

tmpDir, err := os.MkdirTemp("", "")
require.NoError(t, err)
defer os.RemoveAll(tmpDir)

_, err = standardrules.NewStore(ctx, tmpDir)
_, err = standardrules.NewStore(ctx, tmpDir, false)
assert.NoError(t, err)
}

Expand All @@ -44,7 +44,7 @@ func TestStore(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(tmpDir)

service, err := standardrules.NewStore(ctx, tmpDir)
service, err := standardrules.NewStore(ctx, tmpDir, false)
require.NoError(t, err)

tests := []struct {
Expand Down Expand Up @@ -94,7 +94,7 @@ func TestBatchStore(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(tmpDir)

service, err := standardrules.NewStore(ctx, tmpDir)
service, err := standardrules.NewStore(ctx, tmpDir, false)
require.NoError(t, err)

tests := []struct {
Expand Down Expand Up @@ -183,7 +183,7 @@ func TestFetch(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(tmpDir)

service, err := standardrules.NewStore(ctx, tmpDir)
service, err := standardrules.NewStore(ctx, tmpDir, false)
require.NoError(t, err)

require.NoError(t, service.Store(context.Background(), []byte("key"), []byte("value")))
Expand Down Expand Up @@ -233,7 +233,7 @@ func TestFetchAll(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(tmpDir)

service, err := standardrules.NewStore(ctx, tmpDir)
service, err := standardrules.NewStore(ctx, tmpDir, false)
require.NoError(t, err)

keys := [][49]byte{
Expand Down

0 comments on commit c9bcbdb

Please sign in to comment.