Skip to content

Commit

Permalink
feat: save expired batches in db (#4203)
Browse files Browse the repository at this point in the history
  • Loading branch information
acha-bill authored Jul 10, 2023
1 parent b4270fb commit f170e38
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 41 deletions.
112 changes: 80 additions & 32 deletions pkg/storer/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,19 @@ import (
"time"

"github.com/ethersphere/bee/pkg/postage"
storage "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/storageutil"
"github.com/ethersphere/bee/pkg/storer/internal"
"github.com/ethersphere/bee/pkg/storer/internal/reserve"
"github.com/ethersphere/bee/pkg/swarm"
"golang.org/x/exp/slices"
)

const (
reserveOverCapacity = "reserveOverCapacity"
reserveUnreserved = "reserveUnreserved"
reserveUpdateLockKey = "reserveUpdateLockKey"
batchExpiry = "batchExpiry"
expiredBatchAccess = "expiredBatchAccess"

cleanupDur = time.Hour * 6
)
Expand Down Expand Up @@ -123,38 +124,41 @@ func (db *DB) evictionWorker(ctx context.Context) {
defer overCapUnsub()

cleanupExpired := func() {
db.lock.Lock(expiredBatchAccess)
batchesToEvict := make([][]byte, len(db.expiredBatches))
copy(batchesToEvict, db.expiredBatches)
db.expiredBatches = nil
db.lock.Unlock(expiredBatchAccess)

defer db.events.Trigger(reserveUnreserved)

if len(batchesToEvict) > 0 {
for _, batchID := range batchesToEvict {
evicted, err := db.evictBatch(ctx, batchID, swarm.MaxBins)
var batchesToEvict [][]byte
err := db.repo.IndexStore().Iterate(storage.Query{
Factory: func() storage.Item { return new(expiredBatchItem) },
ItemProperty: storage.QueryItemID,
}, func(result storage.Result) (bool, error) {
batchesToEvict = append(batchesToEvict, []byte(result.ID))
return false, nil
})
if err != nil {
db.logger.Error(err, "iterate expired batches")
}

for _, batchID := range batchesToEvict {
evicted, err := db.evictBatch(ctx, batchID, swarm.MaxBins)
if err != nil {
db.logger.Error(err, "evict batch", "batch_id", hex.EncodeToString(batchID))
}
if evicted > 0 {
db.logger.Debug("evicted expired batch", "batch_id", hex.EncodeToString(batchID), "total_evicted", evicted)

err = db.Execute(ctx, func(tx internal.Storage) error {
return tx.IndexStore().Delete(&expiredBatchItem{BatchID: batchID})
})
if err != nil {
db.logger.Error(err, "evict batch", "batch", hex.EncodeToString(batchID))
}
if evicted > 0 {
db.logger.Debug(
"evicted expired batch",
"batch", hex.EncodeToString(batchID),
"total_evicted", evicted,
)
db.logger.Error(err, "delete expired batch", "batch_id", hex.EncodeToString(batchID))
}
db.metrics.ExpiredBatchCount.Inc()
}
db.metrics.ExpiredBatchCount.Inc()
}
}

time.AfterFunc(30*time.Minute, func() {
db.logger.Info("initial reserve cleanup started")
if err := db.reserveCleanup(ctx); err != nil {
db.logger.Error(err, "cleanup")
}
})
// Initial cleanup.
db.events.Trigger(batchExpiry)

cleanUpTicker := time.NewTicker(cleanupDur)
defer cleanUpTicker.Stop()
Expand All @@ -178,7 +182,7 @@ func (db *DB) evictionWorker(ctx context.Context) {
cleanupExpired()

if err := db.reserveCleanup(ctx); err != nil {
db.logger.Error(err, "cleanup")
db.logger.Error(err, "reserve cleanup")
}
}
}
Expand Down Expand Up @@ -270,19 +274,63 @@ func (db *DB) ReservePutter() storage.Putter {
}
}

// expiredBatchItem is a storage.Item implementation for expired batches.
type expiredBatchItem struct {
BatchID []byte
}

// ID implements storage.Item.
func (e *expiredBatchItem) ID() string {
return string(e.BatchID)
}

// Namespace implements storage.Item.
func (e *expiredBatchItem) Namespace() string {
return "expiredBatchItem"
}

// Marshal implements storage.Item.
// It is a no-op as expiredBatchItem is not serialized.
func (e *expiredBatchItem) Marshal() ([]byte, error) {
return nil, nil
}

// Unmarshal implements storage.Item.
// It is a no-op as expiredBatchItem is not serialized.
func (e *expiredBatchItem) Unmarshal(_ []byte) error {
return nil
}

// Clone implements storage.Item.
func (e *expiredBatchItem) Clone() storage.Item {
if e == nil {
return nil
}
return &expiredBatchItem{
BatchID: slices.Clone(e.BatchID),
}
}

// String implements storage.Item.
func (e *expiredBatchItem) String() string {
return storageutil.JoinFields(e.Namespace(), e.ID())
}

// EvictBatch evicts all chunks belonging to a batch from the reserve.
func (db *DB) EvictBatch(ctx context.Context, batchID []byte) (err error) {
func (db *DB) EvictBatch(ctx context.Context, batchID []byte) error {
if db.reserve == nil {
// if reserve is not configured, do nothing
return nil
}

db.lock.Lock(expiredBatchAccess)
db.expiredBatches = append(db.expiredBatches, batchID)
db.lock.Unlock(expiredBatchAccess)
err := db.Execute(ctx, func(tx internal.Storage) error {
return tx.IndexStore().Put(&expiredBatchItem{BatchID: batchID})
})
if err != nil {
return fmt.Errorf("save expired batch: %w", err)
}

db.events.Trigger(batchExpiry)

return nil
}

Expand Down
16 changes: 8 additions & 8 deletions pkg/storer/reserve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ import (
postagetesting "github.com/ethersphere/bee/pkg/postage/testing"
pullerMock "github.com/ethersphere/bee/pkg/puller/mock"
"github.com/ethersphere/bee/pkg/spinlock"
storage "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/storagetest"
chunk "github.com/ethersphere/bee/pkg/storage/testing"
storer "github.com/ethersphere/bee/pkg/storer"
"github.com/ethersphere/bee/pkg/storer"
"github.com/ethersphere/bee/pkg/storer/internal/chunkstamp"
"github.com/ethersphere/bee/pkg/storer/internal/reserve"
"github.com/ethersphere/bee/pkg/storer/internal/stampindex"
Expand All @@ -35,24 +35,24 @@ func TestIndexCollision(t *testing.T) {
stamp := postagetesting.MustNewBatchStamp(postagetesting.MustNewBatch().ID)
putter := storer.ReservePutter()

ch_1 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(stamp)
err := putter.Put(context.Background(), ch_1)
ch1 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(stamp)
err := putter.Put(context.Background(), ch1)
if err != nil {
t.Fatal(err)
}

ch_2 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(stamp)
err = putter.Put(context.Background(), ch_2)
ch2 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(stamp)
err = putter.Put(context.Background(), ch2)
if err == nil {
t.Fatal("expected index collision error")
}

_, err = storer.ReserveGet(context.Background(), ch_2.Address(), ch_2.Stamp().BatchID())
_, err = storer.ReserveGet(context.Background(), ch2.Address(), ch2.Stamp().BatchID())
if !errors.Is(err, storage.ErrNotFound) {
t.Fatal(err)
}

_, err = storer.ReserveGet(context.Background(), ch_1.Address(), ch_1.Stamp().BatchID())
_, err = storer.ReserveGet(context.Background(), ch1.Address(), ch1.Stamp().BatchID())
if err != nil {
t.Fatal(err)
}
Expand Down
1 change: 0 additions & 1 deletion pkg/storer/storer.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,6 @@ type DB struct {
setSyncerOnce sync.Once
syncer Syncer
opts workerOpts
expiredBatches [][]byte
}

type workerOpts struct {
Expand Down

0 comments on commit f170e38

Please sign in to comment.