Skip to content

Commit

Permalink
feat(puller): remove historical syncing and run a single syncer (#4196)
Browse files Browse the repository at this point in the history
  • Loading branch information
istae authored Jul 4, 2023
1 parent cf7e92c commit 389e7f8
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 169 deletions.
2 changes: 1 addition & 1 deletion pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1032,7 +1032,7 @@ func NewBee(
)

if o.FullNodeMode && !o.BootnodeMode {
pullerService = puller.New(stateStore, kad, localStore, pullSyncProtocol, p2ps, logger, puller.Options{}, warmupTime)
pullerService = puller.New(stateStore, kad, localStore, pullSyncProtocol, p2ps, logger, puller.Options{})
b.pullerCloser = pullerService

localStore.StartReserveWorker(ctx, pullerService, networkRadiusFunc)
Expand Down
53 changes: 12 additions & 41 deletions pkg/puller/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,76 +10,47 @@ import (
)

type metrics struct {
HistWorkerIterCounter prometheus.Counter // counts the number of historical syncing iterations
HistWorkerCounter prometheus.Counter // count number of historical syncing jobs
HistWorkerDoneCounter prometheus.Counter // count number of finished historical syncing jobs
HistWorkerErrCounter prometheus.Counter // count number of errors
LiveWorkerCounter prometheus.Counter // count number of live syncing jobs
LiveWorkerIterCounter prometheus.Counter // counts the number of live syncing iterations
LiveWorkerErrCounter prometheus.Counter // count number of errors
SyncWorkerIterCounter prometheus.Counter // counts the number of syncing iterations
SyncWorkerCounter prometheus.Counter // count number of syncing jobs
SyncWorkerDoneCounter prometheus.Counter // count number of finished syncing jobs
SyncWorkerErrCounter prometheus.Counter // count number of errors
MaxUintErrCounter prometheus.Counter // how many times we got maxuint as topmost
HistSyncTimeout prometheus.Counter // counts the number of historical syncing timeouts
}

func newMetrics() metrics {
subsystem := "puller"

return metrics{
HistWorkerIterCounter: prometheus.NewCounter(prometheus.CounterOpts{
SyncWorkerIterCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "hist_worker_iterations",
Name: "worker_iterations",
Help: "Total history worker iterations.",
}),

HistWorkerCounter: prometheus.NewCounter(prometheus.CounterOpts{
SyncWorkerCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "hist_worker",
Name: "worker",
Help: "Total history active worker jobs.",
}),
HistWorkerDoneCounter: prometheus.NewCounter(prometheus.CounterOpts{
SyncWorkerDoneCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "hist_worker_done",
Name: "worker_done",
Help: "Total history worker jobs done.",
}),
HistWorkerErrCounter: prometheus.NewCounter(prometheus.CounterOpts{
SyncWorkerErrCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "hist_worker_errors",
Name: "worker_errors",
Help: "Total history worker errors.",
}),
LiveWorkerCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "live_worker",
Help: "Total live active worker jobs.",
}),
LiveWorkerIterCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "live_worker_iterations",
Help: "Total live worker iterations.",
}),
LiveWorkerErrCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "live_worker_errors",
Help: "Total live worker errors.",
}),
MaxUintErrCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "max_uint_errors",
Help: "Total max uint errors.",
}),
HistSyncTimeout: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "hist_sync_timeout",
Help: "Count of timeouts of historical sync request.",
}),
}
}

Expand Down
170 changes: 47 additions & 123 deletions pkg/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,8 @@ var errCursorsLength = errors.New("cursors length mismatch")
const (
DefaultHistRateWindow = time.Minute * 15

intervalPrefix = "sync_interval"
recalcPeersDur = time.Minute * 5
histSyncTimeout = time.Minute * 10 // this timeout should always be higher than pullsync.makeOfferTimeout
histSyncTimeoutBlockList = time.Hour

maxHistSyncs = swarm.MaxBins * 3
intervalPrefix = "sync_interval"
recalcPeersDur = time.Minute * 5
)

type Options struct {
Expand All @@ -67,9 +63,8 @@ type Puller struct {

bins uint8 // how many bins do we support

histSync *atomic.Uint64 // current number of gorourines doing historical syncing
histSyncLimiter chan struct{} // historical syncing limiter
rate *rate.Rate // rate of historical syncing
sync *atomic.Uint64 // current number of gorourines doing historical syncing
rate *rate.Rate // rate of historical syncing

start sync.Once
}
Expand All @@ -82,28 +77,24 @@ func New(
blockLister p2p.Blocklister,
logger log.Logger,
o Options,
warmupTime time.Duration,
) *Puller {
var (
bins uint8 = swarm.MaxBins
)
bins := swarm.MaxBins
if o.Bins != 0 {
bins = o.Bins
}
p := &Puller{
statestore: stateStore,
topology: topology,
radius: reserveState,
syncer: pullSync,
metrics: newMetrics(),
logger: logger.WithName(loggerName).Register(),
syncPeers: make(map[string]*syncPeer),
bins: bins,
histSync: atomic.NewUint64(0),
blockLister: blockLister,
histSyncLimiter: make(chan struct{}, maxHistSyncs),
rate: rate.New(DefaultHistRateWindow),
cancel: func() { /* Noop, since the context is initialized in the Start(). */ },
statestore: stateStore,
topology: topology,
radius: reserveState,
syncer: pullSync,
metrics: newMetrics(),
logger: logger.WithName(loggerName).Register(),
syncPeers: make(map[string]*syncPeer),
bins: bins,
sync: atomic.NewUint64(0),
blockLister: blockLister,
rate: rate.New(DefaultHistRateWindow),
cancel: func() { /* Noop, since the context is initialized in the Start(). */ },
}

return p
Expand All @@ -120,7 +111,7 @@ func (p *Puller) Start(ctx context.Context) {
}

func (p *Puller) ActiveHistoricalSyncing() uint64 {
return p.histSync.Load()
return p.sync.Load()
}

func (p *Puller) SyncRate() float64 {
Expand Down Expand Up @@ -272,131 +263,64 @@ func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer, storageRadius uin
func (p *Puller) syncPeerBin(ctx context.Context, peer *syncPeer, bin uint8, cur uint64) {
binCtx, cancel := context.WithCancel(ctx)
peer.setBinCancel(cancel, bin)
if cur > 0 {
p.wg.Add(1)
p.histSync.Inc()
go p.histSyncWorker(binCtx, peer.address, bin, cur)
}
// start live
p.wg.Add(1)
go p.liveSyncWorker(binCtx, peer.address, bin, cur)
p.sync.Inc()
go p.syncWorker(binCtx, peer.address, bin, cur)
}

func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uint8, cur uint64) {
func (p *Puller) syncWorker(ctx context.Context, peer swarm.Address, bin uint8, cur uint64) {
loggerV2 := p.logger.V(2).Register()

p.metrics.SyncWorkerCounter.Inc()
defer p.wg.Done()
defer p.metrics.HistWorkerDoneCounter.Inc()
defer p.histSync.Dec()

loopStart := time.Now()
loggerV2.Debug("histSyncWorker starting", "peer_address", peer, "bin", bin, "cursor", cur)

sync := func() bool {
s, _, _, err := p.nextPeerInterval(peer, bin)
if err != nil {
p.metrics.HistWorkerErrCounter.Inc()
p.logger.Error(err, "histSyncWorker nextPeerInterval failed, quitting...")
return true
}
if s > cur {
p.logger.Debug("histSyncWorker syncing finished", "bin", bin, "cursor", cur, "total_duration", time.Since(loopStart), "peer_address", peer)
return true
}

syncStart := time.Now()
ctx, cancel := context.WithTimeout(ctx, histSyncTimeout)
top, count, err := p.syncer.Sync(ctx, peer, bin, s)
cancel()

p.rate.Add(count)
defer p.metrics.SyncWorkerDoneCounter.Inc()
defer p.sync.Dec()

if top >= s {
if err := p.addPeerInterval(peer, bin, s, top); err != nil {
p.metrics.HistWorkerErrCounter.Inc()
p.logger.Error(err, "histSyncWorker could not persist interval for peer", "peer_address", peer)
return false
}
loggerV2.Debug("histSyncWorker pulled", "bin", bin, "start", s, "topmost", top, "duration", time.Since(syncStart), "peer_address", peer)
}

if err != nil {
p.metrics.HistWorkerErrCounter.Inc()
p.logger.Debug("histSyncWorker interval failed", "peer_address", peer, "bin", bin, "cursor", cur, "start", s, "topmost", top, "err", err)
// DeadlineExceeded err could come from any other context timeouts
// so we explicitly check for duration of the sync time
if errors.Is(err, context.DeadlineExceeded) && time.Since(syncStart) >= histSyncTimeout {
p.logger.Debug("histSyncWorker interval timeout, exiting", "total_duration", time.Since(loopStart), "peer_address", peer, "error", err)
_ = p.blockLister.Blocklist(peer, histSyncTimeoutBlockList, "sync interval timeout")
p.metrics.HistSyncTimeout.Inc()
return true
}
if errors.Is(err, p2p.ErrPeerNotFound) {
return true
}
}

return false
}
loggerV2.Debug("syncWorker starting", "peer_address", peer, "bin", bin, "cursor", cur)

for {
p.metrics.HistWorkerIterCounter.Inc()

select {
case <-ctx.Done():
loggerV2.Debug("histSyncWorker context cancelled", "peer_address", peer, "bin", bin, "cursor", cur)
loggerV2.Debug("syncWorker context cancelled", "peer_address", peer, "bin", bin)
return
case p.histSyncLimiter <- struct{}{}:
default:
}

stop := sync()

<-p.histSyncLimiter

if stop {
return
}
}
}

func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uint8, cur uint64) {
loggerV2 := p.logger.V(2).Register()

defer p.wg.Done()
loggerV2.Debug("liveSyncWorker starting", "peer_address", peer, "bin", bin, "cursor", cur)
from := cur + 1

for {
p.metrics.LiveWorkerIterCounter.Inc()
p.metrics.SyncWorkerIterCounter.Inc()

select {
case <-ctx.Done():
loggerV2.Debug("liveSyncWorker context cancelled", "peer_address", peer, "bin", bin, "cursor", cur)
s, _, _, err := p.nextPeerInterval(peer, bin)
if err != nil {
p.metrics.SyncWorkerErrCounter.Inc()
p.logger.Error(err, "syncWorker nextPeerInterval failed, quitting...")
return
default:
}

top, _, err := p.syncer.Sync(ctx, peer, bin, from)
syncStart := time.Now()
top, count, err := p.syncer.Sync(ctx, peer, bin, s)

if top == math.MaxUint64 {
p.metrics.MaxUintErrCounter.Inc()
p.logger.Error(nil, "liveSyncWorker max uint64 encountered, quitting", "peer_address", peer, "bin", bin, "from", from, "topmost", top)
p.logger.Error(nil, "syncWorker max uint64 encountered, quitting", "peer_address", peer, "bin", bin, "from", s, "topmost", top)
return
}

if top >= from {
if err := p.addPeerInterval(peer, bin, from, top); err != nil {
p.metrics.LiveWorkerErrCounter.Inc()
p.logger.Error(err, "liveSyncWorker exit on add peer interval", "peer_address", peer, "bin", bin, "from", from, "error", err)
continue
if top <= cur {
p.rate.Add(count)
}

if top >= s {
if err := p.addPeerInterval(peer, bin, s, top); err != nil {
p.metrics.SyncWorkerErrCounter.Inc()
p.logger.Error(err, "syncWorker could not persist interval for peer", "peer_address", peer)
return
}
loggerV2.Debug("liveSyncWorker pulled bin", "bin", bin, "from", from, "topmost", top, "peer_address", peer)
from = top + 1
loggerV2.Debug("syncWorker pulled", "bin", bin, "start", s, "topmost", top, "duration", time.Since(syncStart), "peer_address", peer)
}

if err != nil {
p.metrics.LiveWorkerErrCounter.Inc()
p.logger.Debug("liveSyncWorker sync error", "peer_address", peer, "bin", bin, "from", from, "topmost", top, "err", err)
p.metrics.SyncWorkerErrCounter.Inc()
p.logger.Debug("syncWorker interval failed", "peer_address", peer, "bin", bin, "cursor", cur, "start", s, "topmost", top, "err", err)
if errors.Is(err, p2p.ErrPeerNotFound) {
return
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/puller/puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestOneSync(t *testing.T) {
cursors = []uint64{1000, 1000, 1000}
replies = []mockps.SyncReply{
{Bin: 1, Start: 1, Topmost: 1000, Peer: addr},
{Bin: 2, Start: 1001, Topmost: 1001, Peer: addr}}
{Bin: 2, Start: 1, Topmost: 1001, Peer: addr}}
)

_, _, kad, pullsync := newPuller(t, opts{
Expand All @@ -52,7 +52,6 @@ func TestOneSync(t *testing.T) {

waitCursorsCalled(t, pullsync, addr)
waitSyncCalledBins(t, pullsync, addr, 1, 2)
waitSync(t, pullsync, addr)
}

func TestSyncOutsideDepth(t *testing.T) {
Expand Down Expand Up @@ -92,7 +91,6 @@ func TestSyncOutsideDepth(t *testing.T) {

func TestSyncIntervals(t *testing.T) {
t.Parallel()
t.Skip("skip until we have a better way to test this")

addr := swarm.RandAddress(t)

Expand Down Expand Up @@ -536,7 +534,7 @@ func newPuller(t *testing.T, ops opts) (*puller.Puller, storage.StateStorer, *ka
o := puller.Options{
Bins: ops.bins,
}
p := puller.New(s, kad, ops.rs, ps, nil, logger, o, 0)
p := puller.New(s, kad, ops.rs, ps, nil, logger, o)
p.Start(context.Background())

testutil.CleanupCloser(t, p)
Expand Down

0 comments on commit 389e7f8

Please sign in to comment.