Skip to content

Commit

Permalink
fix: issues from public testnet testing (#4202)
Browse files Browse the repository at this point in the history
  • Loading branch information
aloknerurkar authored Jul 3, 2023
1 parent 2ace005 commit 78120b9
Show file tree
Hide file tree
Showing 23 changed files with 255 additions and 126 deletions.
6 changes: 3 additions & 3 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,13 +795,13 @@ func (p *putterSessionWrapper) Cleanup() error {
return errors.Join(p.PutterSession.Cleanup(), p.save(false))
}

func (s *Service) getStamper(batchID []byte) (postage.Stamper, func(bool) error, error) {
func (s *Service) getStamper(ctx context.Context, batchID []byte) (postage.Stamper, func(bool) error, error) {
exists, err := s.batchStore.Exists(batchID)
if err != nil {
return nil, nil, fmt.Errorf("batch exists: %w", err)
}

issuer, save, err := s.post.GetStampIssuer(batchID)
issuer, save, err := s.post.GetStampIssuer(ctx, batchID)
if err != nil {
return nil, nil, fmt.Errorf("stamp issuer: %w", err)
}
Expand All @@ -818,7 +818,7 @@ func (s *Service) newStamperPutter(ctx context.Context, opts putterOptions) (sto
return nil, errUnsupportedDevNodeOperation
}

stamper, save, err := s.getStamper(opts.BatchID)
stamper, save, err := s.getStamper(ctx, opts.BatchID)
if err != nil {
return nil, fmt.Errorf("get stamper: %w", err)
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/api/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ func (s *Service) bytesUploadHandler(w http.ResponseWriter, r *http.Request) {
jsonhttp.BadRequest(w, "invalid batch id")
case errors.Is(err, errUnsupportedDevNodeOperation):
jsonhttp.BadRequest(w, errUnsupportedDevNodeOperation)
case errors.Is(err, postage.ErrBatchInUse):
jsonhttp.BadRequest(w, postage.ErrBatchInUse)
default:
jsonhttp.BadRequest(w, nil)
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/api/bzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ func (s *Service) bzzUploadHandler(w http.ResponseWriter, r *http.Request) {
jsonhttp.BadRequest(w, "invalid batch id")
case errors.Is(err, errUnsupportedDevNodeOperation):
jsonhttp.BadRequest(w, errUnsupportedDevNodeOperation)
case errors.Is(err, postage.ErrBatchInUse):
jsonhttp.BadRequest(w, postage.ErrBatchInUse)
default:
jsonhttp.BadRequest(w, nil)
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/api/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ func (s *Service) chunkUploadHandler(w http.ResponseWriter, r *http.Request) {
jsonhttp.BadRequest(w, "invalid batch id")
case errors.Is(err, errUnsupportedDevNodeOperation):
jsonhttp.BadRequest(w, errUnsupportedDevNodeOperation)
case errors.Is(err, postage.ErrBatchInUse):
jsonhttp.BadRequest(w, postage.ErrBatchInUse)
default:
jsonhttp.BadRequest(w, nil)
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/api/chunk_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ func (s *Service) chunkUploadStreamHandler(w http.ResponseWriter, r *http.Reques
jsonhttp.BadRequest(w, "invalid batch id")
case errors.Is(err, errUnsupportedDevNodeOperation):
jsonhttp.BadRequest(w, errUnsupportedDevNodeOperation)
case errors.Is(err, postage.ErrBatchInUse):
jsonhttp.BadRequest(w, postage.ErrBatchInUse)
default:
jsonhttp.BadRequest(w, nil)
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/api/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,6 @@ func (s *Service) feedPostHandler(w http.ResponseWriter, r *http.Request) {
jsonhttp.BadRequest(w, "invalid batch id")
case errors.Is(err, errUnsupportedDevNodeOperation):
jsonhttp.BadRequest(w, errUnsupportedDevNodeOperation)
case errors.Is(err, postage.ErrBatchInUse):
jsonhttp.BadRequest(w, postage.ErrBatchInUse)
default:
jsonhttp.BadRequest(w, nil)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/postage.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func (s *Service) postageGetStampBucketsHandler(w http.ResponseWriter, r *http.R
}
hexBatchID := hex.EncodeToString(paths.BatchID)

issuer, save, err := s.post.GetStampIssuer(paths.BatchID)
issuer, save, err := s.post.GetStampIssuer(r.Context(), paths.BatchID)
if err != nil {
logger.Debug("get stamp issuer: get issuer failed", "batch_id", hexBatchID, "error", err)
logger.Error(nil, "get stamp issuer: get issuer failed")
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/pss.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (s *Service) pssPostHandler(w http.ResponseWriter, r *http.Request) {
jsonhttp.InternalServerError(w, "pss send failed")
return
}
i, save, err := s.post.GetStampIssuer(headers.BatchID)
i, save, err := s.post.GetStampIssuer(r.Context(), headers.BatchID)
if err != nil {
logger.Debug("get postage batch issuer failed", "batch_id", hex.EncodeToString(headers.BatchID), "error", err)
logger.Error(nil, "get postage batch issuer failed")
Expand Down
2 changes: 0 additions & 2 deletions pkg/api/soc.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) {
jsonhttp.NotFound(w, "batch with id not found")
case errors.Is(err, errInvalidPostageBatch):
jsonhttp.BadRequest(w, "invalid batch id")
case errors.Is(err, postage.ErrBatchInUse):
jsonhttp.BadRequest(w, postage.ErrBatchInUse)
default:
jsonhttp.BadRequest(w, nil)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/stewardship.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (s *Service) stewardshipPutHandler(w http.ResponseWriter, r *http.Request)
} else {
batchID = headers.BatchID
}
stamper, save, err := s.getStamper(batchID)
stamper, save, err := s.getStamper(r.Context(), batchID)
if err != nil {
switch {
case errors.Is(err, errBatchUnusable) || errors.Is(err, postage.ErrNotUsable):
Expand Down
1 change: 0 additions & 1 deletion pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,6 @@ func NewBee(
lo.ReserveCapacity = ReserveCapacity
lo.ReserveWakeUpDuration = reserveWakeUpDuration
lo.RadiusSetter = kad
lo.ReserveInitialCleanup = true
}

localStore, err := storer.New(ctx, path, lo)
Expand Down
20 changes: 6 additions & 14 deletions pkg/postage/mock/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package mock

import (
"bytes"
"context"
"math/big"
"sync"

Expand All @@ -24,8 +25,7 @@ func (f optionFunc) apply(r *mockPostage) { f(r) }
// New creates a new mock postage service.
func New(o ...Option) postage.Service {
m := &mockPostage{
issuersMap: make(map[string]*postage.StampIssuer),
issuersInUse: make(map[string]*postage.StampIssuer),
issuersMap: make(map[string]*postage.StampIssuer),
}
for _, v := range o {
v.apply(m)
Expand All @@ -47,10 +47,9 @@ func WithIssuer(s *postage.StampIssuer) Option {
}

type mockPostage struct {
issuersMap map[string]*postage.StampIssuer
issuerLock sync.Mutex
acceptAll bool
issuersInUse map[string]*postage.StampIssuer
issuersMap map[string]*postage.StampIssuer
issuerLock sync.Mutex
acceptAll bool
}

func (m *mockPostage) SetExpired() error {
Expand Down Expand Up @@ -88,7 +87,7 @@ func (m *mockPostage) StampIssuers() ([]*postage.StampIssuer, error) {
return issuers, nil
}

func (m *mockPostage) GetStampIssuer(id []byte) (*postage.StampIssuer, func(bool) error, error) {
func (m *mockPostage) GetStampIssuer(_ context.Context, id []byte) (*postage.StampIssuer, func(bool) error, error) {
if m.acceptAll {
return postage.NewStampIssuer("test fallback", "test identity", id, big.NewInt(3), 24, 6, 1000, true), func(_ bool) error { return nil }, nil
}
Expand All @@ -101,14 +100,7 @@ func (m *mockPostage) GetStampIssuer(id []byte) (*postage.StampIssuer, func(bool
return nil, nil, postage.ErrNotFound
}

if _, inUse := m.issuersInUse[string(id)]; inUse {
return nil, nil, postage.ErrBatchInUse
}
m.issuersInUse[string(id)] = i
return i, func(_ bool) error {
m.issuerLock.Lock()
defer m.issuerLock.Unlock()
delete(m.issuersInUse, string(id))
return nil
}, nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/postage/postagecontract/contract_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func TestCreateBatch(t *testing.T) {
t.Fatalf("got wrong batchId. wanted %v, got %v", batchID, returnedID)
}

si, _, err := postageMock.GetStampIssuer(returnedID)
si, _, err := postageMock.GetStampIssuer(context.Background(), returnedID)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -297,7 +297,7 @@ func TestTopUpBatch(t *testing.T) {
t.Fatal(err)
}

si, _, err := postageMock.GetStampIssuer(batch.ID)
si, _, err := postageMock.GetStampIssuer(context.Background(), batch.ID)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -483,7 +483,7 @@ func TestDiluteBatch(t *testing.T) {
t.Fatal(err)
}

si, _, err := postageMock.GetStampIssuer(batch.ID)
si, _, err := postageMock.GetStampIssuer(context.Background(), batch.ID)
if err != nil {
t.Fatal(err)
}
Expand Down
43 changes: 28 additions & 15 deletions pkg/postage/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
package postage

import (
"context"
"errors"
"io"
"math/big"
"sync"

"github.com/ethersphere/bee/pkg/log"
"github.com/ethersphere/bee/pkg/storage"
"golang.org/x/sync/semaphore"
)

const (
Expand All @@ -25,15 +27,13 @@ var (
ErrNotFound = errors.New("not found")
// ErrNotUsable is the error returned when issuer with given batch ID is not usable.
ErrNotUsable = errors.New("not usable")
// ErrBatchInUse is the error returned when issuer with given batch ID is already in use.
ErrBatchInUse = errors.New("batch is in use by another upload process")
)

// Service is the postage service interface.
type Service interface {
Add(*StampIssuer) error
StampIssuers() ([]*StampIssuer, error)
GetStampIssuer([]byte) (*StampIssuer, func(bool) error, error)
GetStampIssuer(context.Context, []byte) (*StampIssuer, func(bool) error, error)
IssuerUsable(*StampIssuer) bool
BatchEventListener
BatchExpiryHandler
Expand All @@ -47,7 +47,7 @@ type service struct {
store storage.Store
postageStore Storer
chainID int64
issuersInUse map[string]any
issuersInUse map[string]*semaphore.Weighted
}

// NewService constructs a new Service.
Expand All @@ -56,7 +56,7 @@ func NewService(store storage.Store, postageStore Storer, chainID int64) Service
store: store,
postageStore: postageStore,
chainID: chainID,
issuersInUse: make(map[string]any),
issuersInUse: make(map[string]*semaphore.Weighted),
}
}

Expand Down Expand Up @@ -150,31 +150,44 @@ func (ps *service) IssuerUsable(st *StampIssuer) bool {
return true
}

// GetStampIssuer finds a stamp issuer by batch ID.
func (ps *service) GetStampIssuer(batchID []byte) (*StampIssuer, func(bool) error, error) {
// GetStampIssuer finds a stamp issuer by batch ID. Only one caller can use the
// stamp issuer at a time. The caller must call the returned release function
// when done with the stamp issuer.
func (ps *service) GetStampIssuer(ctx context.Context, batchID []byte) (*StampIssuer, func(bool) error, error) {

var issuerAccess *semaphore.Weighted
key := string(batchID)

ps.lock.Lock()
defer ps.lock.Unlock()
if s, ok := ps.issuersInUse[key]; ok {
issuerAccess = s
} else {
issuerAccess = semaphore.NewWeighted(1)
ps.issuersInUse[key] = issuerAccess
}
ps.lock.Unlock()

if _, ok := ps.issuersInUse[string(batchID)]; ok {
return nil, nil, ErrBatchInUse
if err := issuerAccess.Acquire(ctx, 1); err != nil {
return nil, nil, err
}

item := NewStampIssuerItem(batchID)
err := ps.store.Get(item)
if errors.Is(err, storage.ErrNotFound) {
issuerAccess.Release(1)
return nil, nil, ErrNotFound
}
if err != nil {
issuerAccess.Release(1)
return nil, nil, err
}

if !ps.IssuerUsable(item.Issuer) {
issuerAccess.Release(1)
return nil, nil, ErrNotUsable
}
ps.issuersInUse[string(batchID)] = struct{}{}

return item.Issuer, func(update bool) error {
ps.lock.Lock()
defer ps.lock.Unlock()
delete(ps.issuersInUse, string(batchID))
defer issuerAccess.Release(1)
if !update {
return nil
}
Expand Down
Loading

0 comments on commit 78120b9

Please sign in to comment.