Skip to content

Commit

Permalink
feat: store expired batches in db
Browse files Browse the repository at this point in the history
  • Loading branch information
acha-bill committed Jul 5, 2023
1 parent 389e7f8 commit 958206e
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 22 deletions.
28 changes: 28 additions & 0 deletions pkg/postage/batchstore/mock/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ type BatchStore struct {
existsFn func([]byte) (bool, error)

mtx sync.Mutex

expiredMu sync.Mutex
expiredBatches map[string]struct{}
}

func (bs *BatchStore) SetBatchExpiryHandler(eh postage.BatchExpiryHandler) {}
Expand All @@ -49,6 +52,7 @@ func New(opts ...Option) *BatchStore {
for _, o := range opts {
o(bs)
}
bs.expiredBatches = make(map[string]struct{})
return bs
}

Expand Down Expand Up @@ -223,6 +227,30 @@ func (bs *BatchStore) ResetCalls() int {
return bs.resetCallCount
}

func (bs *BatchStore) SaveExpired(batchID []byte) error {
bs.expiredMu.Lock()
defer bs.expiredMu.Unlock()
bs.expiredBatches[string(batchID)] = struct{}{}
return nil
}

func (bs *BatchStore) Expired() ([][]byte, error) {
bs.expiredMu.Lock()
defer bs.expiredMu.Unlock()
expired := make([][]byte, 0, len(bs.expiredBatches))
for id := range bs.expiredBatches {
expired = append(expired, []byte(id))
}
return expired, nil
}

func (bs *BatchStore) DeleteExpired(batchID []byte) error {
bs.expiredMu.Lock()
defer bs.expiredMu.Unlock()
delete(bs.expiredBatches, string(batchID))
return nil
}

type MockEventUpdater struct {
inProgress bool
err error
Expand Down
22 changes: 21 additions & 1 deletion pkg/postage/batchstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
valueKeyPrefix = "batchstore_value_"
chainStateKey = "batchstore_chainstate"
reserveRadiusKey = "batchstore_radius"
expiredKeyPrefix = "batchstore_expired_"
)

// ErrNotFound signals that the element was not found.
Expand Down Expand Up @@ -81,7 +82,6 @@ func New(st storage.StateStorer, ev evictFn, capacity int, logger log.Logger) (p
s.cs.Store(cs)

s.radius.Store(uint32(radius))

return s, nil
}

Expand Down Expand Up @@ -126,6 +126,26 @@ func (s *store) Iterate(cb func(*postage.Batch) (bool, error)) error {
})
}

func (s *store) SaveExpired(batchID []byte) error {
return s.store.Put(expiredKeyPrefix+string(batchID), nil)
}

func (s *store) Expired() ([][]byte, error) {
var expired [][]byte
err := s.store.Iterate(expiredKeyPrefix, func(key, _ []byte) (bool, error) {
expired = append(expired, key[len(expiredKeyPrefix):])
return false, nil
})
if err != nil {
return nil, err
}
return expired, nil
}

func (s *store) DeleteExpired(ID []byte) error {
return s.store.Delete(expiredKeyPrefix + string(ID))
}

// Save is implementation of postage.Storer interface Save method.
// This method has side effects; it also updates the radius of the node if successful.
func (s *store) Save(batch *postage.Batch) error {
Expand Down
21 changes: 21 additions & 0 deletions pkg/postage/batchstore/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package batchstore_test

import (
"bytes"
"errors"
"math/big"
"math/rand"
Expand Down Expand Up @@ -227,6 +228,26 @@ func TestBatchStore_Reset(t *testing.T) {
}
}

func TestExpired(t *testing.T) {
st := setupBatchStore(t, defaultCapacity)
id := postagetest.MustNewID()
err := st.SaveExpired(id)
if err != nil {
t.Fatal(err)
}

expired, err := st.Expired()
if err != nil {
t.Fatal(err)
}
if len(expired) != 1 {
t.Fatalf("want 1 expired batch. got %d", len(expired))
}
if !bytes.Equal(expired[0], id) {
t.Fatal("mismatched batch id")
}
}

type testBatch struct {
depth uint8
value int
Expand Down
9 changes: 9 additions & 0 deletions pkg/postage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ type Storer interface {
Reset() error

SetBatchExpiryHandler(BatchExpiryHandler)

// SaveExpired saves the expired batch id in the store.
SaveExpired(_ []byte) error

// Expired returns the expired batch id from the store.
Expired() ([][]byte, error)

// DeleteExpired deletes the expired batch id from the store.
DeleteExpired([]byte) error
}

// StorageRadiusSetter is used to calculate total batch commitment of the network.
Expand Down
6 changes: 6 additions & 0 deletions pkg/postage/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,9 @@ func (b *NoOpBatchStore) SetStorageRadius(func(uint8) uint8) error { return nil
func (b *NoOpBatchStore) Commitment() (uint64, error) { return 0, nil }

func (b *NoOpBatchStore) Reset() error { return nil }

func (b *NoOpBatchStore) SaveExpired(_ []byte) error { return nil }

func (b *NoOpBatchStore) Expired() ([][]byte, error) { return nil, nil }

func (b *NoOpBatchStore) DeleteExpired(_ []byte) error { return nil }
42 changes: 22 additions & 20 deletions pkg/storer/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ const (
reserveUnreserved = "reserveUnreserved"
reserveUpdateLockKey = "reserveUpdateLockKey"
batchExpiry = "batchExpiry"
expiredBatchAccess = "expiredBatchAccess"

cleanupDur = time.Hour * 6
)
Expand Down Expand Up @@ -119,21 +118,24 @@ func (db *DB) evictionWorker(ctx context.Context) {
defer overCapUnsub()

cleanupExpired := func() {
db.lock.Lock(expiredBatchAccess)
batchesToEvict := make([][]byte, len(db.expiredBatches))
copy(batchesToEvict, db.expiredBatches)
db.expiredBatches = nil
db.lock.Unlock(expiredBatchAccess)

defer db.events.Trigger(reserveUnreserved)
expiredBatches, err := db.batchstore.Expired()
if err != nil {
db.logger.Error(err, "get expired batches")
return
}
for _, batchID := range expiredBatches {
db.metrics.ExpiredBatchCount.Inc()

if len(batchesToEvict) > 0 {
for _, batchID := range batchesToEvict {
err := db.evictBatch(ctx, batchID, swarm.MaxBins)
if err != nil {
db.logger.Error(err, "evict batch")
}
db.metrics.ExpiredBatchCount.Inc()
err = db.evictBatch(ctx, batchID, swarm.MaxBins)
if err != nil {
db.logger.Error(err, "evict batch")
continue
}

err = db.batchstore.DeleteExpired(batchID)
if err != nil {
db.logger.Error(err, "delete expired")
}
}
}
Expand All @@ -160,7 +162,7 @@ func (db *DB) evictionWorker(ctx context.Context) {
cleanupExpired()

if err := db.reserveCleanup(ctx); err != nil {
db.logger.Error(err, "cleanup")
db.logger.Error(err, "reserve cleanup")
}
}
}
Expand Down Expand Up @@ -253,18 +255,18 @@ func (db *DB) ReservePutter() storage.Putter {
}

// EvictBatch evicts all chunks belonging to a batch from the reserve.
func (db *DB) EvictBatch(ctx context.Context, batchID []byte) (err error) {
func (db *DB) EvictBatch(ctx context.Context, batchID []byte) error {
if db.reserve == nil {
// if reserve is not configured, do nothing
return nil
}

db.lock.Lock(expiredBatchAccess)
db.expiredBatches = append(db.expiredBatches, batchID)
db.lock.Unlock(expiredBatchAccess)
err := db.batchstore.SaveExpired(batchID)
if err != nil {
return fmt.Errorf("save expired batch: %w", err)
}

db.events.Trigger(batchExpiry)

return nil
}

Expand Down
1 change: 0 additions & 1 deletion pkg/storer/storer.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,6 @@ type DB struct {
setSyncerOnce sync.Once
syncer Syncer
opts workerOpts
expiredBatches [][]byte
}

type workerOpts struct {
Expand Down

0 comments on commit 958206e

Please sign in to comment.