Skip to content

Commit

Permalink
feat(pullsync): rate limit handler (#4799)
Browse files Browse the repository at this point in the history
  • Loading branch information
istae authored Sep 9, 2024
1 parent 8c390f6 commit 2d75623
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 112 deletions.
6 changes: 0 additions & 6 deletions pkg/hive/hive.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ type Service struct {
metrics metrics
inLimiter *ratelimit.Limiter
outLimiter *ratelimit.Limiter
clearMtx sync.Mutex
quit chan struct{}
wg sync.WaitGroup
peersChan chan pb.Peers
Expand Down Expand Up @@ -243,13 +242,8 @@ func (s *Service) peersHandler(ctx context.Context, peer p2p.Peer, stream p2p.St
}

func (s *Service) disconnect(peer p2p.Peer) error {

s.clearMtx.Lock()
defer s.clearMtx.Unlock()

s.inLimiter.Clear(peer.Address.ByteString())
s.outLimiter.Clear(peer.Address.ByteString())

return nil
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,11 @@ func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer, storageRadius uin
}

/*
The syncing behavior diverges for peers outside and winthin the storage radius.
The syncing behavior diverges for peers outside and within the storage radius.
For neighbor peers, we sync ALL bins greater than or equal to the storage radius.
For peers with PO lower than the storage radius, we must sync ONLY the bin that is the PO.
For peers peer with PO lower than the storage radius and even lower than the allowed minimum threshold,
no syncing is done.
*/

if peer.po >= storageRadius {
Expand Down Expand Up @@ -289,9 +291,7 @@ func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer, storageRadius uin
p.syncPeerBin(ctx, peer, peer.po, peer.cursors[peer.po])
}
} else {
for bin := uint8(0); bin < p.bins; bin++ {
peer.cancelBin(bin)
}
peer.stop()
}

return nil
Expand Down Expand Up @@ -356,11 +356,11 @@ func (p *Puller) syncPeerBin(parentCtx context.Context, peer *syncPeer, bin uint
loggerV2.Debug("syncWorker interval failed", "error", err, "peer_address", address, "bin", bin, "cursor", cursor, "start", start, "topmost", top)
}

_ = p.limiter.WaitN(ctx, count)

if isHistorical {
p.metrics.SyncedCounter.WithLabelValues("historical").Add(float64(count))
p.rate.Add(count)
// rate limit historical syncing
_ = p.limiter.WaitN(ctx, count)
} else {
p.metrics.SyncedCounter.WithLabelValues("live").Add(float64(count))
}
Expand Down
207 changes: 115 additions & 92 deletions pkg/pullsync/pullsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/p2p/protobuf"
"github.com/ethersphere/bee/v2/pkg/postage"
"github.com/ethersphere/bee/v2/pkg/pullsync/pb"
"github.com/ethersphere/bee/v2/pkg/ratelimit"
"github.com/ethersphere/bee/v2/pkg/soc"
"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storer"
Expand All @@ -45,10 +46,12 @@ var (
)

const (
MaxCursor = math.MaxUint64
DefaultMaxPage uint64 = 250
pageTimeout = time.Second
makeOfferTimeout = 15 * time.Minute
MaxCursor = math.MaxUint64
DefaultMaxPage uint64 = 250
pageTimeout = time.Second
makeOfferTimeout = 15 * time.Minute
handleMaxChunksPerSecond = 100
handleRequestsLimitRate = time.Second / handleMaxChunksPerSecond // handle max 100 chunks per second per peer
)

// Interface is the PullSync interface.
Expand All @@ -74,6 +77,8 @@ type Syncer struct {

maxPage uint64

limiter *ratelimit.Limiter

Interface
io.Closer
}
Expand All @@ -96,6 +101,7 @@ func New(
logger: logger.WithName(loggerName).Register(),
quit: make(chan struct{}),
maxPage: maxPage,
limiter: ratelimit.New(handleRequestsLimitRate, int(maxPage)),
}
}

Expand All @@ -113,9 +119,109 @@ func (s *Syncer) Protocol() p2p.ProtocolSpec {
Handler: s.cursorHandler,
},
},
DisconnectIn: s.disconnect,
DisconnectOut: s.disconnect,
}
}

// handler handles an incoming request to sync an interval
func (s *Syncer) handler(streamCtx context.Context, p p2p.Peer, stream p2p.Stream) (err error) {

select {
case <-s.quit:
return nil
default:
s.syncInProgress.Add(1)
defer s.syncInProgress.Add(-1)
}

r := protobuf.NewReader(stream)
defer func() {
if err != nil {
_ = stream.Reset()
} else {
_ = stream.FullClose()
}
}()

ctx, cancel := context.WithCancel(streamCtx)
defer cancel()

go func() {
select {
case <-s.quit:
cancel()
case <-ctx.Done():
return
}
}()

var rn pb.Get
if err := r.ReadMsgWithContext(ctx, &rn); err != nil {
return fmt.Errorf("read get range: %w", err)
}

// recreate the reader to allow the first one to be garbage collected
// before the makeOffer function call, to reduce the total memory allocated
// while makeOffer is executing (waiting for the new chunks)
w, r := protobuf.NewWriterAndReader(stream)

// make an offer to the upstream peer in return for the requested range
offer, err := s.makeOffer(ctx, rn)
if err != nil {
return fmt.Errorf("make offer: %w", err)
}

if err := w.WriteMsgWithContext(ctx, offer); err != nil {
return fmt.Errorf("write offer: %w", err)
}

// we don't have any hashes to offer in this range (the
// interval is empty). nothing more to do
if len(offer.Chunks) == 0 {
return nil
}

s.metrics.SentOffered.Add(float64(len(offer.Chunks)))

var want pb.Want
if err := r.ReadMsgWithContext(ctx, &want); err != nil {
return fmt.Errorf("read want: %w", err)
}

chs, err := s.processWant(ctx, offer, &want)
if err != nil {
return fmt.Errorf("process want: %w", err)
}

for _, c := range chs {
var stamp []byte
if c.Stamp() != nil {
stamp, err = c.Stamp().MarshalBinary()
if err != nil {
return fmt.Errorf("serialise stamp: %w", err)
}
}

deliver := pb.Delivery{Address: c.Address().Bytes(), Data: c.Data(), Stamp: stamp}
if err := w.WriteMsgWithContext(ctx, &deliver); err != nil {
return fmt.Errorf("write delivery: %w", err)
}
s.metrics.Sent.Inc()
}

// slow down future requests
waitDur, err := s.limiter.Wait(streamCtx, p.Address.ByteString(), max(1, len(chs)))
if err != nil {
return fmt.Errorf("rate limiter: %w", err)
}
if waitDur > 0 {
s.logger.Debug("rate limited peer", "wait_duration", waitDur, "peer_address", p.Address)
}

return nil
}

// Sync syncs a batch of chunks starting at a start BinID.
// It returns the BinID of highest chunk that was synced from the given
// batch and the total number of chunks the downstream peer has sent.
Expand Down Expand Up @@ -283,94 +389,6 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start
return topmost, chunksPut, chunkErr
}

// handler handles an incoming request to sync an interval
func (s *Syncer) handler(streamCtx context.Context, p p2p.Peer, stream p2p.Stream) (err error) {
select {
case <-s.quit:
return nil
default:
s.syncInProgress.Add(1)
defer s.syncInProgress.Add(-1)
}

r := protobuf.NewReader(stream)
defer func() {
if err != nil {
_ = stream.Reset()
} else {
_ = stream.FullClose()
}
}()

ctx, cancel := context.WithCancel(streamCtx)
defer cancel()

go func() {
select {
case <-s.quit:
cancel()
case <-ctx.Done():
return
}
}()

var rn pb.Get
if err := r.ReadMsgWithContext(ctx, &rn); err != nil {
return fmt.Errorf("read get range: %w", err)
}

// recreate the reader to allow the first one to be garbage collected
// before the makeOffer function call, to reduce the total memory allocated
// while makeOffer is executing (waiting for the new chunks)
w, r := protobuf.NewWriterAndReader(stream)

// make an offer to the upstream peer in return for the requested range
offer, err := s.makeOffer(ctx, rn)
if err != nil {
return fmt.Errorf("make offer: %w", err)
}

if err := w.WriteMsgWithContext(ctx, offer); err != nil {
return fmt.Errorf("write offer: %w", err)
}

// we don't have any hashes to offer in this range (the
// interval is empty). nothing more to do
if len(offer.Chunks) == 0 {
return nil
}

s.metrics.SentOffered.Add(float64(len(offer.Chunks)))

var want pb.Want
if err := r.ReadMsgWithContext(ctx, &want); err != nil {
return fmt.Errorf("read want: %w", err)
}

chs, err := s.processWant(ctx, offer, &want)
if err != nil {
return fmt.Errorf("process want: %w", err)
}

for _, c := range chs {
var stamp []byte
if c.Stamp() != nil {
stamp, err = c.Stamp().MarshalBinary()
if err != nil {
return fmt.Errorf("serialise stamp: %w", err)
}
}

deliver := pb.Delivery{Address: c.Address().Bytes(), Data: c.Data(), Stamp: stamp}
if err := w.WriteMsgWithContext(ctx, &deliver); err != nil {
return fmt.Errorf("write delivery: %w", err)
}
s.metrics.Sent.Inc()
}

return nil
}

// makeOffer tries to assemble an offer for a given requested interval.
func (s *Syncer) makeOffer(ctx context.Context, rn pb.Get) (*pb.Offer, error) {

Expand Down Expand Up @@ -552,6 +570,11 @@ func (s *Syncer) cursorHandler(ctx context.Context, p p2p.Peer, stream p2p.Strea
return nil
}

func (s *Syncer) disconnect(peer p2p.Peer) error {
s.limiter.Clear(peer.Address.ByteString())
return nil
}

func (s *Syncer) Close() error {
s.logger.Info("pull syncer shutting down")
close(s.quit)
Expand Down
30 changes: 24 additions & 6 deletions pkg/ratelimit/ratelimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package ratelimit

import (
"context"
"sync"
"time"

Expand All @@ -32,24 +33,41 @@ func New(r time.Duration, burst int) *Limiter {

// Allow checks if the limiter that belongs to 'key' has not exceeded the limit.
func (l *Limiter) Allow(key string, count int) bool {
return l.getLimiter(key).AllowN(time.Now(), count)
}

// Wait blocks until the limiter permits n events to happen. Returns the time duration
// the limiter waited for to allow the number of events to occur.
func (l *Limiter) Wait(ctx context.Context, key string, count int) (time.Duration, error) {
limiter := l.getLimiter(key)

n := time.Now()

if limiter.AllowN(n, count) {
return 0, nil
}

err := limiter.WaitN(ctx, count)

return time.Since(n), err
}

// Clear deletes the limiter that belongs to 'key'
func (l *Limiter) getLimiter(key string) *rate.Limiter {
l.mtx.Lock()
defer l.mtx.Unlock()

limiter, ok := l.limiter[key]
if !ok {
limiter = rate.NewLimiter(l.rate, l.burst)
l.limiter[key] = limiter
}

// We are intentionally not defer calling Unlock in order to reduce locking extent.
// Individual limiter is capable for handling concurrent calls.
l.mtx.Unlock()

return limiter.AllowN(time.Now(), count)
return limiter
}

// Clear deletes the limiter that belongs to 'key'
func (l *Limiter) Clear(key string) {

l.mtx.Lock()
defer l.mtx.Unlock()

Expand Down
Loading

0 comments on commit 2d75623

Please sign in to comment.