From 78120b97bf0a57a190e7c74834041b395dd236b2 Mon Sep 17 00:00:00 2001 From: aloknerurkar Date: Mon, 3 Jul 2023 19:02:51 +0530 Subject: [PATCH] fix: issues from public testnet testing (#4202) --- pkg/api/api.go | 6 +- pkg/api/bytes.go | 2 - pkg/api/bzz.go | 2 - pkg/api/chunk.go | 2 - pkg/api/chunk_stream.go | 2 - pkg/api/feed.go | 2 - pkg/api/postage.go | 2 +- pkg/api/pss.go | 2 +- pkg/api/soc.go | 2 - pkg/api/stewardship.go | 2 +- pkg/node/node.go | 1 - pkg/postage/mock/service.go | 20 +-- pkg/postage/postagecontract/contract_test.go | 6 +- pkg/postage/service.go | 43 +++-- pkg/postage/service_test.go | 52 ++++-- pkg/pullsync/pullsync.go | 10 +- pkg/pusher/pusher.go | 2 +- pkg/storer/internal/reserve/reserve.go | 13 +- pkg/storer/metrics.go | 9 + pkg/storer/reserve.go | 173 +++++++++++++++---- pkg/storer/reserve_test.go | 9 + pkg/storer/storer.go | 12 +- pkg/storer/subscribe_push.go | 7 +- 23 files changed, 255 insertions(+), 126 deletions(-) diff --git a/pkg/api/api.go b/pkg/api/api.go index 44867c7af54..8aa658dcfb8 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -795,13 +795,13 @@ func (p *putterSessionWrapper) Cleanup() error { return errors.Join(p.PutterSession.Cleanup(), p.save(false)) } -func (s *Service) getStamper(batchID []byte) (postage.Stamper, func(bool) error, error) { +func (s *Service) getStamper(ctx context.Context, batchID []byte) (postage.Stamper, func(bool) error, error) { exists, err := s.batchStore.Exists(batchID) if err != nil { return nil, nil, fmt.Errorf("batch exists: %w", err) } - issuer, save, err := s.post.GetStampIssuer(batchID) + issuer, save, err := s.post.GetStampIssuer(ctx, batchID) if err != nil { return nil, nil, fmt.Errorf("stamp issuer: %w", err) } @@ -818,7 +818,7 @@ func (s *Service) newStamperPutter(ctx context.Context, opts putterOptions) (sto return nil, errUnsupportedDevNodeOperation } - stamper, save, err := s.getStamper(opts.BatchID) + stamper, save, err := s.getStamper(ctx, opts.BatchID) if err != nil { return nil, fmt.Errorf("get stamper: %w", err) } diff --git a/pkg/api/bytes.go b/pkg/api/bytes.go index 701dcd21bc9..d9453aa5624 100644 --- a/pkg/api/bytes.go +++ b/pkg/api/bytes.go @@ -77,8 +77,6 @@ func (s *Service) bytesUploadHandler(w http.ResponseWriter, r *http.Request) { jsonhttp.BadRequest(w, "invalid batch id") case errors.Is(err, errUnsupportedDevNodeOperation): jsonhttp.BadRequest(w, errUnsupportedDevNodeOperation) - case errors.Is(err, postage.ErrBatchInUse): - jsonhttp.BadRequest(w, postage.ErrBatchInUse) default: jsonhttp.BadRequest(w, nil) } diff --git a/pkg/api/bzz.go b/pkg/api/bzz.go index acf0facf232..fb467ea3115 100644 --- a/pkg/api/bzz.go +++ b/pkg/api/bzz.go @@ -86,8 +86,6 @@ func (s *Service) bzzUploadHandler(w http.ResponseWriter, r *http.Request) { jsonhttp.BadRequest(w, "invalid batch id") case errors.Is(err, errUnsupportedDevNodeOperation): jsonhttp.BadRequest(w, errUnsupportedDevNodeOperation) - case errors.Is(err, postage.ErrBatchInUse): - jsonhttp.BadRequest(w, postage.ErrBatchInUse) default: jsonhttp.BadRequest(w, nil) } diff --git a/pkg/api/chunk.go b/pkg/api/chunk.go index 9dc5312d857..910270abf8a 100644 --- a/pkg/api/chunk.go +++ b/pkg/api/chunk.go @@ -79,8 +79,6 @@ func (s *Service) chunkUploadHandler(w http.ResponseWriter, r *http.Request) { jsonhttp.BadRequest(w, "invalid batch id") case errors.Is(err, errUnsupportedDevNodeOperation): jsonhttp.BadRequest(w, errUnsupportedDevNodeOperation) - case errors.Is(err, postage.ErrBatchInUse): - jsonhttp.BadRequest(w, postage.ErrBatchInUse) default: jsonhttp.BadRequest(w, nil) } diff --git a/pkg/api/chunk_stream.go b/pkg/api/chunk_stream.go index 36c130bf3d8..005ba5020d1 100644 --- a/pkg/api/chunk_stream.go +++ b/pkg/api/chunk_stream.go @@ -73,8 +73,6 @@ func (s *Service) chunkUploadStreamHandler(w http.ResponseWriter, r *http.Reques jsonhttp.BadRequest(w, "invalid batch id") case errors.Is(err, errUnsupportedDevNodeOperation): jsonhttp.BadRequest(w, errUnsupportedDevNodeOperation) - case errors.Is(err, postage.ErrBatchInUse): - jsonhttp.BadRequest(w, postage.ErrBatchInUse) default: jsonhttp.BadRequest(w, nil) } diff --git a/pkg/api/feed.go b/pkg/api/feed.go index 1d10df6f509..42a48f523f3 100644 --- a/pkg/api/feed.go +++ b/pkg/api/feed.go @@ -182,8 +182,6 @@ func (s *Service) feedPostHandler(w http.ResponseWriter, r *http.Request) { jsonhttp.BadRequest(w, "invalid batch id") case errors.Is(err, errUnsupportedDevNodeOperation): jsonhttp.BadRequest(w, errUnsupportedDevNodeOperation) - case errors.Is(err, postage.ErrBatchInUse): - jsonhttp.BadRequest(w, postage.ErrBatchInUse) default: jsonhttp.BadRequest(w, nil) } diff --git a/pkg/api/postage.go b/pkg/api/postage.go index 05e4006eb29..5b08ac8014c 100644 --- a/pkg/api/postage.go +++ b/pkg/api/postage.go @@ -277,7 +277,7 @@ func (s *Service) postageGetStampBucketsHandler(w http.ResponseWriter, r *http.R } hexBatchID := hex.EncodeToString(paths.BatchID) - issuer, save, err := s.post.GetStampIssuer(paths.BatchID) + issuer, save, err := s.post.GetStampIssuer(r.Context(), paths.BatchID) if err != nil { logger.Debug("get stamp issuer: get issuer failed", "batch_id", hexBatchID, "error", err) logger.Error(nil, "get stamp issuer: get issuer failed") diff --git a/pkg/api/pss.go b/pkg/api/pss.go index f64affe56b1..3fa63759e95 100644 --- a/pkg/api/pss.go +++ b/pkg/api/pss.go @@ -79,7 +79,7 @@ func (s *Service) pssPostHandler(w http.ResponseWriter, r *http.Request) { jsonhttp.InternalServerError(w, "pss send failed") return } - i, save, err := s.post.GetStampIssuer(headers.BatchID) + i, save, err := s.post.GetStampIssuer(r.Context(), headers.BatchID) if err != nil { logger.Debug("get postage batch issuer failed", "batch_id", hex.EncodeToString(headers.BatchID), "error", err) logger.Error(nil, "get postage batch issuer failed") diff --git a/pkg/api/soc.go b/pkg/api/soc.go index f7219c68614..6dfafadda28 100644 --- a/pkg/api/soc.go +++ b/pkg/api/soc.go @@ -87,8 +87,6 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) { jsonhttp.NotFound(w, "batch with id not found") case errors.Is(err, errInvalidPostageBatch): jsonhttp.BadRequest(w, "invalid batch id") - case errors.Is(err, postage.ErrBatchInUse): - jsonhttp.BadRequest(w, postage.ErrBatchInUse) default: jsonhttp.BadRequest(w, nil) } diff --git a/pkg/api/stewardship.go b/pkg/api/stewardship.go index 8f2f5a961c1..d46c5228f52 100644 --- a/pkg/api/stewardship.go +++ b/pkg/api/stewardship.go @@ -53,7 +53,7 @@ func (s *Service) stewardshipPutHandler(w http.ResponseWriter, r *http.Request) } else { batchID = headers.BatchID } - stamper, save, err := s.getStamper(batchID) + stamper, save, err := s.getStamper(r.Context(), batchID) if err != nil { switch { case errors.Is(err, errBatchUnusable) || errors.Is(err, postage.ErrNotUsable): diff --git a/pkg/node/node.go b/pkg/node/node.go index be70fc1037c..9c603bbc3c4 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -777,7 +777,6 @@ func NewBee( lo.ReserveCapacity = ReserveCapacity lo.ReserveWakeUpDuration = reserveWakeUpDuration lo.RadiusSetter = kad - lo.ReserveInitialCleanup = true } localStore, err := storer.New(ctx, path, lo) diff --git a/pkg/postage/mock/service.go b/pkg/postage/mock/service.go index 12c47f28a60..194556b0153 100644 --- a/pkg/postage/mock/service.go +++ b/pkg/postage/mock/service.go @@ -6,6 +6,7 @@ package mock import ( "bytes" + "context" "math/big" "sync" @@ -24,8 +25,7 @@ func (f optionFunc) apply(r *mockPostage) { f(r) } // New creates a new mock postage service. func New(o ...Option) postage.Service { m := &mockPostage{ - issuersMap: make(map[string]*postage.StampIssuer), - issuersInUse: make(map[string]*postage.StampIssuer), + issuersMap: make(map[string]*postage.StampIssuer), } for _, v := range o { v.apply(m) @@ -47,10 +47,9 @@ func WithIssuer(s *postage.StampIssuer) Option { } type mockPostage struct { - issuersMap map[string]*postage.StampIssuer - issuerLock sync.Mutex - acceptAll bool - issuersInUse map[string]*postage.StampIssuer + issuersMap map[string]*postage.StampIssuer + issuerLock sync.Mutex + acceptAll bool } func (m *mockPostage) SetExpired() error { @@ -88,7 +87,7 @@ func (m *mockPostage) StampIssuers() ([]*postage.StampIssuer, error) { return issuers, nil } -func (m *mockPostage) GetStampIssuer(id []byte) (*postage.StampIssuer, func(bool) error, error) { +func (m *mockPostage) GetStampIssuer(_ context.Context, id []byte) (*postage.StampIssuer, func(bool) error, error) { if m.acceptAll { return postage.NewStampIssuer("test fallback", "test identity", id, big.NewInt(3), 24, 6, 1000, true), func(_ bool) error { return nil }, nil } @@ -101,14 +100,7 @@ func (m *mockPostage) GetStampIssuer(id []byte) (*postage.StampIssuer, func(bool return nil, nil, postage.ErrNotFound } - if _, inUse := m.issuersInUse[string(id)]; inUse { - return nil, nil, postage.ErrBatchInUse - } - m.issuersInUse[string(id)] = i return i, func(_ bool) error { - m.issuerLock.Lock() - defer m.issuerLock.Unlock() - delete(m.issuersInUse, string(id)) return nil }, nil } diff --git a/pkg/postage/postagecontract/contract_test.go b/pkg/postage/postagecontract/contract_test.go index 611424a2d45..48da228c049 100644 --- a/pkg/postage/postagecontract/contract_test.go +++ b/pkg/postage/postagecontract/contract_test.go @@ -133,7 +133,7 @@ func TestCreateBatch(t *testing.T) { t.Fatalf("got wrong batchId. wanted %v, got %v", batchID, returnedID) } - si, _, err := postageMock.GetStampIssuer(returnedID) + si, _, err := postageMock.GetStampIssuer(context.Background(), returnedID) if err != nil { t.Fatal(err) } @@ -297,7 +297,7 @@ func TestTopUpBatch(t *testing.T) { t.Fatal(err) } - si, _, err := postageMock.GetStampIssuer(batch.ID) + si, _, err := postageMock.GetStampIssuer(context.Background(), batch.ID) if err != nil { t.Fatal(err) } @@ -483,7 +483,7 @@ func TestDiluteBatch(t *testing.T) { t.Fatal(err) } - si, _, err := postageMock.GetStampIssuer(batch.ID) + si, _, err := postageMock.GetStampIssuer(context.Background(), batch.ID) if err != nil { t.Fatal(err) } diff --git a/pkg/postage/service.go b/pkg/postage/service.go index 6920c47403e..8680dfb4968 100644 --- a/pkg/postage/service.go +++ b/pkg/postage/service.go @@ -5,6 +5,7 @@ package postage import ( + "context" "errors" "io" "math/big" @@ -12,6 +13,7 @@ import ( "github.com/ethersphere/bee/pkg/log" "github.com/ethersphere/bee/pkg/storage" + "golang.org/x/sync/semaphore" ) const ( @@ -25,15 +27,13 @@ var ( ErrNotFound = errors.New("not found") // ErrNotUsable is the error returned when issuer with given batch ID is not usable. ErrNotUsable = errors.New("not usable") - // ErrBatchInUse is the error returned when issuer with given batch ID is already in use. - ErrBatchInUse = errors.New("batch is in use by another upload process") ) // Service is the postage service interface. type Service interface { Add(*StampIssuer) error StampIssuers() ([]*StampIssuer, error) - GetStampIssuer([]byte) (*StampIssuer, func(bool) error, error) + GetStampIssuer(context.Context, []byte) (*StampIssuer, func(bool) error, error) IssuerUsable(*StampIssuer) bool BatchEventListener BatchExpiryHandler @@ -47,7 +47,7 @@ type service struct { store storage.Store postageStore Storer chainID int64 - issuersInUse map[string]any + issuersInUse map[string]*semaphore.Weighted } // NewService constructs a new Service. @@ -56,7 +56,7 @@ func NewService(store storage.Store, postageStore Storer, chainID int64) Service store: store, postageStore: postageStore, chainID: chainID, - issuersInUse: make(map[string]any), + issuersInUse: make(map[string]*semaphore.Weighted), } } @@ -150,31 +150,44 @@ func (ps *service) IssuerUsable(st *StampIssuer) bool { return true } -// GetStampIssuer finds a stamp issuer by batch ID. -func (ps *service) GetStampIssuer(batchID []byte) (*StampIssuer, func(bool) error, error) { +// GetStampIssuer finds a stamp issuer by batch ID. Only one caller can use the +// stamp issuer at a time. The caller must call the returned release function +// when done with the stamp issuer. +func (ps *service) GetStampIssuer(ctx context.Context, batchID []byte) (*StampIssuer, func(bool) error, error) { + + var issuerAccess *semaphore.Weighted + key := string(batchID) + ps.lock.Lock() - defer ps.lock.Unlock() + if s, ok := ps.issuersInUse[key]; ok { + issuerAccess = s + } else { + issuerAccess = semaphore.NewWeighted(1) + ps.issuersInUse[key] = issuerAccess + } + ps.lock.Unlock() - if _, ok := ps.issuersInUse[string(batchID)]; ok { - return nil, nil, ErrBatchInUse + if err := issuerAccess.Acquire(ctx, 1); err != nil { + return nil, nil, err } + item := NewStampIssuerItem(batchID) err := ps.store.Get(item) if errors.Is(err, storage.ErrNotFound) { + issuerAccess.Release(1) return nil, nil, ErrNotFound } if err != nil { + issuerAccess.Release(1) return nil, nil, err } - if !ps.IssuerUsable(item.Issuer) { + issuerAccess.Release(1) return nil, nil, ErrNotUsable } - ps.issuersInUse[string(batchID)] = struct{}{} + return item.Issuer, func(update bool) error { - ps.lock.Lock() - defer ps.lock.Unlock() - delete(ps.issuersInUse, string(batchID)) + defer issuerAccess.Release(1) if !update { return nil } diff --git a/pkg/postage/service_test.go b/pkg/postage/service_test.go index 3f357f01637..d5710301f05 100644 --- a/pkg/postage/service_test.go +++ b/pkg/postage/service_test.go @@ -5,11 +5,13 @@ package postage_test import ( + "context" crand "crypto/rand" "errors" "io" "math/big" "testing" + "time" "github.com/ethersphere/bee/pkg/postage" pstoremock "github.com/ethersphere/bee/pkg/postage/batchstore/mock" @@ -109,7 +111,7 @@ func TestGetStampIssuer(t *testing.T) { } t.Run("found", func(t *testing.T) { for _, id := range ids[1:4] { - st, save, err := ps.GetStampIssuer(id) + st, save, err := ps.GetStampIssuer(context.Background(), id) if err != nil { t.Fatalf("expected no error, got %v", err) } @@ -132,14 +134,14 @@ func TestGetStampIssuer(t *testing.T) { } }) t.Run("not found", func(t *testing.T) { - _, _, err := ps.GetStampIssuer(ids[0]) + _, _, err := ps.GetStampIssuer(context.Background(), ids[0]) if !errors.Is(err, postage.ErrNotFound) { t.Fatalf("expected ErrNotFound, got %v", err) } }) t.Run("not usable", func(t *testing.T) { for _, id := range ids[4:] { - _, _, err := ps.GetStampIssuer(id) + _, _, err := ps.GetStampIssuer(context.Background(), id) if !errors.Is(err, postage.ErrNotUsable) { t.Fatalf("expected ErrNotUsable, got %v", err) } @@ -153,7 +155,7 @@ func TestGetStampIssuer(t *testing.T) { if err != nil { t.Fatalf("expected no error, got %v", err) } - st, sv, err := ps.GetStampIssuer(b.ID) + st, sv, err := ps.GetStampIssuer(context.Background(), b.ID) if err != nil { t.Fatalf("expected no error, got %v", err) } @@ -170,7 +172,7 @@ func TestGetStampIssuer(t *testing.T) { if err != nil { t.Fatal(err) } - stampIssuer, save, err := ps.GetStampIssuer(ids[1]) + stampIssuer, save, err := ps.GetStampIssuer(context.Background(), ids[1]) if err != nil { t.Fatalf("expected no error, got %v", err) } @@ -184,7 +186,7 @@ func TestGetStampIssuer(t *testing.T) { if err != nil { t.Fatal(err) } - stampIssuer, save, err := ps.GetStampIssuer(ids[2]) + stampIssuer, save, err := ps.GetStampIssuer(context.Background(), ids[2]) if err != nil { t.Fatalf("expected no error, got %v", err) } @@ -197,24 +199,46 @@ func TestGetStampIssuer(t *testing.T) { } }) t.Run("in use", func(t *testing.T) { - _, save1, err := ps.GetStampIssuer(ids[1]) + _, save1, err := ps.GetStampIssuer(context.Background(), ids[1]) if err != nil { t.Fatal(err) } - _, save2, err := ps.GetStampIssuer(ids[2]) + _, save2, err := ps.GetStampIssuer(context.Background(), ids[2]) if err != nil { t.Fatal(err) } _ = save2(true) - _, _, err = ps.GetStampIssuer(ids[1]) - if !errors.Is(err, postage.ErrBatchInUse) { - t.Fatalf("expected ErrBatchInUse, got %v", err) - } + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + _, _, err = ps.GetStampIssuer(ctx, ids[1]) + if !errors.Is(err, context.DeadlineExceeded) { + t.Fatalf("expected context.DeadlineExceeded, got %v", err) + } + // ensure we get access once the first one is saved + done := make(chan struct{}) + errC := make(chan error, 1) + go func() { + _, save12, err := ps.GetStampIssuer(context.Background(), ids[1]) + if err != nil { + errC <- err + return + } + _ = save12(true) + close(done) + }() _ = save1(true) + select { + case <-done: + case err := <-errC: + t.Fatal(err) + case <-time.After(time.Second): + t.Fatal("timeout") + } }) t.Run("save without update", func(t *testing.T) { - is, save, err := ps.GetStampIssuer(ids[1]) + is, save, err := ps.GetStampIssuer(context.Background(), ids[1]) if err != nil { t.Fatal(err) } @@ -231,7 +255,7 @@ func TestGetStampIssuer(t *testing.T) { t.Fatal(err) } - is, _, err = ps.GetStampIssuer(ids[1]) + is, _, err = ps.GetStampIssuer(context.Background(), ids[1]) if err != nil { t.Fatal(err) } diff --git a/pkg/pullsync/pullsync.go b/pkg/pullsync/pullsync.go index d029974a414..c6fce400352 100644 --- a/pkg/pullsync/pullsync.go +++ b/pkg/pullsync/pullsync.go @@ -55,6 +55,11 @@ const ( DefaultMaxPage uint64 = 250 ) +// singleflight key for intervals +func sfKey(bin uint8, start uint64) string { + return fmt.Sprintf("%d-%d", bin, start) +} + // how many maximum chunks in a batch // Interface is the PullSync interface. @@ -339,6 +344,9 @@ func (s *Syncer) handler(streamCtx context.Context, p p2p.Peer, stream p2p.Strea chs, err := s.processWant(ctx, offer, &want) if err != nil { + if errors.Is(err, storage.ErrNotFound) { + s.intervalsSF.Forget(sfKey(uint8(rn.Bin), rn.Start)) + } return fmt.Errorf("process want: %w", err) } @@ -389,7 +397,7 @@ func (s *Syncer) collectAddrs(ctx context.Context, bin uint8, start uint64) ([]* topmost uint64 } - v, _, err := s.intervalsSF.Do(ctx, fmt.Sprintf("%v-%v", bin, start), func(ctx context.Context) (interface{}, error) { + v, _, err := s.intervalsSF.Do(ctx, sfKey(bin, start), func(ctx context.Context) (interface{}, error) { var ( chs []*storer.BinC topmost uint64 diff --git a/pkg/pusher/pusher.go b/pkg/pusher/pusher.go index ed71d2f6d6e..2fff5739af2 100644 --- a/pkg/pusher/pusher.go +++ b/pkg/pusher/pusher.go @@ -129,7 +129,7 @@ func (s *Service) chunksWorker(warmupTime time.Duration, tracer *tracing.Tracer) // inflight.set handles the backpressure for the maximum amount of inflight chunks // and duplicate handling. - chunks, unsubscribe := s.storer.SubscribePush(ctx) + chunks, unsubscribe := s.storer.SubscribePush(cctx) defer func() { unsubscribe() cancel() diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index a793678d8c9..95bc1c38e33 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -112,21 +112,14 @@ func (r *Reserve) Put(ctx context.Context, store internal.Storage, chunk swarm.C // 4. Update the stamp index newStampIndex = false - oldChunkPO := swarm.Proximity(r.baseAddr.Bytes(), item.ChunkAddress.Bytes()) - oldChunk := &batchRadiusItem{Bin: oldChunkPO, BatchID: chunk.Stamp().BatchID(), Address: item.ChunkAddress} - err := indexStore.Get(oldChunk) - if err != nil { - return false, fmt.Errorf("failed getting old chunk item to replace: %w", err) - } - - err = removeChunk(ctx, store, oldChunk) + err := r.DeleteChunk(ctx, store, item.ChunkAddress, chunk.Stamp().BatchID()) if err != nil { return false, fmt.Errorf("failed removing older chunk: %w", err) } r.logger.Debug( "replacing chunk stamp index", - "old_chunk", oldChunk.Address.String(), - "new_chunk", chunk.Address().String(), + "old_chunk", item.ChunkAddress, + "new_chunk", chunk.Address(), "batch_id", hex.EncodeToString(chunk.Stamp().BatchID()), ) diff --git a/pkg/storer/metrics.go b/pkg/storer/metrics.go index 990186b352d..a4e040c9e15 100644 --- a/pkg/storer/metrics.go +++ b/pkg/storer/metrics.go @@ -26,6 +26,7 @@ type metrics struct { EvictedChunkCount prometheus.Counter ExpiredChunkCount prometheus.Counter OverCapTriggerCount prometheus.Counter + ExpiredBatchCount prometheus.Counter } // newMetrics is a convenient constructor for creating new metrics. @@ -107,6 +108,14 @@ func newMetrics() metrics { Help: "Number of times the reserve was over capacity and triggered an eviction.", }, ), + ExpiredBatchCount: prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "expired_batch_count", + Help: "Number of batches expired, that were processed.", + }, + ), } } diff --git a/pkg/storer/reserve.go b/pkg/storer/reserve.go index da130e814ea..aa9bf9d5ef0 100644 --- a/pkg/storer/reserve.go +++ b/pkg/storer/reserve.go @@ -23,6 +23,8 @@ const ( reserveOverCapacity = "reserveOverCapacity" reserveUnreserved = "reserveUnreserved" reserveUpdateLockKey = "reserveUpdateLockKey" + batchExpiry = "batchExpiry" + expiredBatchAccess = "expiredBatchAccess" cleanupDur = time.Hour * 6 ) @@ -37,17 +39,21 @@ type Syncer interface { func threshold(capacity int) int { return capacity * 5 / 10 } -func (db *DB) reserveWorker(ctx context.Context, warmupDur, wakeUpDur time.Duration, radius func() (uint8, error)) { - defer db.reserveWg.Done() - +func (db *DB) startReserveWorkers( + ctx context.Context, + warmupDur, wakeUpDur time.Duration, + radius func() (uint8, error), +) { ctx, cancel := context.WithCancel(ctx) go func() { <-db.quit cancel() }() - overCapTrigger, overCapUnsub := db.events.Subscribe(reserveOverCapacity) - defer overCapUnsub() + // start eviction worker first as there could be batch expirations because of + // initial contract sync + db.reserveWg.Add(1) + go db.evictionWorker(ctx) select { case <-time.After(warmupDur): @@ -74,20 +80,20 @@ func (db *DB) reserveWorker(ctx context.Context, warmupDur, wakeUpDur time.Durat // syncing can now begin now that the reserver worker is running db.syncer.Start(ctx) + db.reserveWg.Add(1) + go db.radiusWorker(ctx, wakeUpDur) +} + +func (db *DB) radiusWorker(ctx context.Context, wakeUpDur time.Duration) { + defer db.reserveWg.Done() + radiusWakeUpTicker := time.NewTicker(wakeUpDur) defer radiusWakeUpTicker.Stop() - cleanUpTicker := time.NewTicker(cleanupDur) - defer cleanUpTicker.Stop() - for { select { - case <-overCapTrigger: - err := db.unreserve(ctx) - if err != nil { - db.logger.Error(err, "reserve unreserve") - } - db.metrics.OverCapTriggerCount.Inc() + case <-ctx.Done(): + return case <-radiusWakeUpTicker.C: radius := db.reserve.Radius() if db.reserve.Size() < threshold(db.reserve.Capacity()) && db.syncer.SyncRate() == 0 && radius > 0 { @@ -99,12 +105,63 @@ func (db *DB) reserveWorker(ctx context.Context, warmupDur, wakeUpDur time.Durat db.logger.Info("reserve radius decrease", "radius", radius) } db.metrics.StorageRadius.Set(float64(radius)) + } + } +} + +func (db *DB) evictionWorker(ctx context.Context) { + defer db.reserveWg.Done() + + batchExpiryTrigger, batchExpiryUnsub := db.events.Subscribe(batchExpiry) + defer batchExpiryUnsub() + + overCapTrigger, overCapUnsub := db.events.Subscribe(reserveOverCapacity) + defer overCapUnsub() + + cleanupExpired := func() { + db.lock.Lock(expiredBatchAccess) + batchesToEvict := make([][]byte, len(db.expiredBatches)) + copy(batchesToEvict, db.expiredBatches) + db.expiredBatches = nil + db.lock.Unlock(expiredBatchAccess) + + defer db.events.Trigger(reserveUnreserved) + + if len(batchesToEvict) > 0 { + for _, batchID := range batchesToEvict { + err := db.evictBatch(ctx, batchID, swarm.MaxBins) + if err != nil { + db.logger.Error(err, "evict batch") + } + db.metrics.ExpiredBatchCount.Inc() + } + } + } + + cleanUpTicker := time.NewTicker(cleanupDur) + defer cleanUpTicker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-overCapTrigger: + // check if there are expired batches first + cleanupExpired() + + err := db.unreserve(ctx) + if err != nil { + db.logger.Error(err, "reserve unreserve") + } + db.metrics.OverCapTriggerCount.Inc() + case <-batchExpiryTrigger: + cleanupExpired() case <-cleanUpTicker.C: + cleanupExpired() + if err := db.reserveCleanup(ctx); err != nil { db.logger.Error(err, "cleanup") } - case <-db.quit: - return } } } @@ -201,19 +258,26 @@ func (db *DB) EvictBatch(ctx context.Context, batchID []byte) (err error) { // if reserve is not configured, do nothing return nil } + + db.lock.Lock(expiredBatchAccess) + db.expiredBatches = append(db.expiredBatches, batchID) + db.lock.Unlock(expiredBatchAccess) + + db.events.Trigger(batchExpiry) + + return nil +} + +func (db *DB) evictBatch(ctx context.Context, batchID []byte, upToBin uint8) (retErr error) { dur := captureDuration(time.Now()) defer func() { db.metrics.MethodCallsDuration.WithLabelValues("reserve", "EvictBatch").Observe(dur()) - if err == nil { + if retErr == nil { db.metrics.MethodCalls.WithLabelValues("reserve", "EvictBatch", "success").Inc() } else { db.metrics.MethodCalls.WithLabelValues("reserve", "EvictBatch", "failure").Inc() } }() - return db.evictBatch(ctx, batchID, swarm.MaxBins) -} - -func (db *DB) evictBatch(ctx context.Context, batchID []byte, upToBin uint8) (err error) { for b := uint8(0); b < upToBin; b++ { @@ -225,14 +289,32 @@ func (db *DB) evictBatch(ctx context.Context, batchID []byte, upToBin uint8) (er var evicted int - err = db.reserve.IterateBatchBin(ctx, db.repo, b, batchID, func(address swarm.Address) (bool, error) { - err := db.removeChunk(ctx, address, batchID) + type evictItems struct { + address swarm.Address + batchID []byte + } + + var itemsToEvict []evictItems + + err := db.reserve.IterateBatchBin(ctx, db.repo, b, batchID, func(address swarm.Address) (bool, error) { + itemsToEvict = append(itemsToEvict, evictItems{ + address: address, + batchID: batchID, + }) + return false, nil + }) + if err != nil { + return fmt.Errorf("reserve: iterate batch bin: %w", err) + } + + for _, item := range itemsToEvict { + err = db.removeChunk(ctx, item.address, item.batchID) if err != nil { - return false, err + retErr = errors.Join(retErr, fmt.Errorf("reserve: remove chunk %v: %w", item, err)) + continue } evicted++ - return false, nil - }) + } // if there was an error, we still need to update the chunks that have already // been evicted from the reserve @@ -245,8 +327,8 @@ func (db *DB) evictBatch(ctx context.Context, batchID []byte, upToBin uint8) (er } else { db.metrics.EvictedChunkCount.Add(float64(evicted)) } - if err != nil { - return err + if retErr != nil { + return retErr } } @@ -286,24 +368,35 @@ func (db *DB) reserveCleanup(ctx context.Context) error { db.metrics.ReserveSize.Set(float64(db.reserve.Size())) }() - ids := map[string]struct{}{} + var itemsToEvict []reserve.ChunkItem - err := db.batchstore.Iterate(func(b *postage.Batch) (bool, error) { - ids[string(b.ID)] = struct{}{} + err := db.reserve.IterateChunksItems(db.repo, 0, func(ci reserve.ChunkItem) (bool, error) { + if exists, err := db.batchstore.Exists(ci.BatchID); err == nil && !exists { + itemsToEvict = append(itemsToEvict, ci) + } return false, nil }) if err != nil { return err } - return db.reserve.IterateChunksItems(db.repo, 0, func(ci reserve.ChunkItem) (bool, error) { - if _, ok := ids[string(ci.BatchID)]; !ok { - removed++ - db.logger.Debug("cleanup expired batch", "batch_id", hex.EncodeToString(ci.BatchID)) - return false, db.removeChunk(ctx, ci.ChunkAddress, ci.BatchID) + expiredBatches := make(map[string]struct{}) + var retErr error + + for _, item := range itemsToEvict { + err = db.removeChunk(ctx, item.ChunkAddress, item.BatchID) + if err != nil { + retErr = errors.Join(retErr, fmt.Errorf("reserve: remove chunk %v: %w", item, err)) + continue } - return false, nil - }) + removed++ + if _, ok := expiredBatches[string(item.BatchID)]; !ok { + expiredBatches[string(item.BatchID)] = struct{}{} + db.logger.Debug("cleanup expired batch", "batch_id", hex.EncodeToString(item.BatchID)) + } + } + + return retErr } func (db *DB) unreserve(ctx context.Context) (err error) { @@ -320,6 +413,10 @@ func (db *DB) unreserve(ctx context.Context) (err error) { radius := db.reserve.Radius() defer db.events.Trigger(reserveUnreserved) + if db.reserve.IsWithinCapacity() { + return nil + } + for radius < swarm.MaxBins { err := db.batchstore.Iterate(func(b *postage.Batch) (bool, error) { diff --git a/pkg/storer/reserve_test.go b/pkg/storer/reserve_test.go index 627f9db703c..ed5f977f705 100644 --- a/pkg/storer/reserve_test.go +++ b/pkg/storer/reserve_test.go @@ -203,6 +203,15 @@ func TestEvictBatch(t *testing.T) { t.Fatal(err) } + gotUnreserveSignal := make(chan struct{}) + go func() { + defer close(gotUnreserveSignal) + c, unsub := st.Events().Subscribe("reserveUnreserved") + defer unsub() + <-c + }() + <-gotUnreserveSignal + reserve := st.Reserve() for _, ch := range chunks { diff --git a/pkg/storer/storer.go b/pkg/storer/storer.go index 598f5a7698d..ce47bbf5133 100644 --- a/pkg/storer/storer.go +++ b/pkg/storer/storer.go @@ -382,7 +382,6 @@ type Options struct { RadiusSetter topology.SetStorageRadiuser StateStore storage.StateStorer - ReserveInitialCleanup bool ReserveCapacity int ReserveWakeUpDuration time.Duration } @@ -427,6 +426,7 @@ type DB struct { setSyncerOnce sync.Once syncer Syncer opts workerOpts + expiredBatches [][]byte } type workerOpts struct { @@ -508,13 +508,6 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) { } db.reserve = rs - if opts.ReserveInitialCleanup { - err = db.reserveCleanup(ctx) - if err != nil { - return nil, err - } - } - db.metrics.StorageRadius.Set(float64(rs.Radius())) db.metrics.ReserveSize.Set(float64(rs.Size())) } @@ -588,8 +581,7 @@ func (db *DB) SetRetrievalService(r retrieval.Interface) { func (db *DB) StartReserveWorker(ctx context.Context, s Syncer, radius func() (uint8, error)) { db.setSyncerOnce.Do(func() { db.syncer = s - db.reserveWg.Add(1) - go db.reserveWorker(ctx, db.opts.warmupDuration, db.opts.wakeupDuration, radius) + go db.startReserveWorkers(ctx, db.opts.warmupDuration, db.opts.wakeupDuration, radius) }) } diff --git a/pkg/storer/subscribe_push.go b/pkg/storer/subscribe_push.go index b230501df7d..a47e4b1e419 100644 --- a/pkg/storer/subscribe_push.go +++ b/pkg/storer/subscribe_push.go @@ -6,8 +6,10 @@ package storer import ( "context" + "errors" "sync" + storage "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storer/internal/upload" "github.com/ethersphere/bee/pkg/swarm" ) @@ -53,7 +55,10 @@ func (db *DB) SubscribePush(ctx context.Context) (<-chan swarm.Chunk, func()) { }) if err != nil { - return + if !errors.Is(err, storage.ErrNotFound) { + db.logger.Error(err, "subscribe push: iterate error") + return + } } select {