diff --git a/pkg/node/node.go b/pkg/node/node.go index 9c603bbc3c4..0143f959fd4 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -1032,7 +1032,7 @@ func NewBee( ) if o.FullNodeMode && !o.BootnodeMode { - pullerService = puller.New(stateStore, kad, localStore, pullSyncProtocol, p2ps, logger, puller.Options{}, warmupTime) + pullerService = puller.New(stateStore, kad, localStore, pullSyncProtocol, p2ps, logger, puller.Options{}) b.pullerCloser = pullerService localStore.StartReserveWorker(ctx, pullerService, networkRadiusFunc) diff --git a/pkg/puller/metrics.go b/pkg/puller/metrics.go index 64fe0bf3722..7f81271feda 100644 --- a/pkg/puller/metrics.go +++ b/pkg/puller/metrics.go @@ -10,76 +10,47 @@ import ( ) type metrics struct { - HistWorkerIterCounter prometheus.Counter // counts the number of historical syncing iterations - HistWorkerCounter prometheus.Counter // count number of historical syncing jobs - HistWorkerDoneCounter prometheus.Counter // count number of finished historical syncing jobs - HistWorkerErrCounter prometheus.Counter // count number of errors - LiveWorkerCounter prometheus.Counter // count number of live syncing jobs - LiveWorkerIterCounter prometheus.Counter // counts the number of live syncing iterations - LiveWorkerErrCounter prometheus.Counter // count number of errors + SyncWorkerIterCounter prometheus.Counter // counts the number of syncing iterations + SyncWorkerCounter prometheus.Counter // count number of syncing jobs + SyncWorkerDoneCounter prometheus.Counter // count number of finished syncing jobs + SyncWorkerErrCounter prometheus.Counter // count number of errors MaxUintErrCounter prometheus.Counter // how many times we got maxuint as topmost - HistSyncTimeout prometheus.Counter // counts the number of historical syncing timeouts } func newMetrics() metrics { subsystem := "puller" return metrics{ - HistWorkerIterCounter: prometheus.NewCounter(prometheus.CounterOpts{ + SyncWorkerIterCounter: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: m.Namespace, Subsystem: subsystem, - Name: "hist_worker_iterations", + Name: "worker_iterations", Help: "Total history worker iterations.", }), - - HistWorkerCounter: prometheus.NewCounter(prometheus.CounterOpts{ + SyncWorkerCounter: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: m.Namespace, Subsystem: subsystem, - Name: "hist_worker", + Name: "worker", Help: "Total history active worker jobs.", }), - HistWorkerDoneCounter: prometheus.NewCounter(prometheus.CounterOpts{ + SyncWorkerDoneCounter: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: m.Namespace, Subsystem: subsystem, - Name: "hist_worker_done", + Name: "worker_done", Help: "Total history worker jobs done.", }), - HistWorkerErrCounter: prometheus.NewCounter(prometheus.CounterOpts{ + SyncWorkerErrCounter: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: m.Namespace, Subsystem: subsystem, - Name: "hist_worker_errors", + Name: "worker_errors", Help: "Total history worker errors.", }), - LiveWorkerCounter: prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: m.Namespace, - Subsystem: subsystem, - Name: "live_worker", - Help: "Total live active worker jobs.", - }), - LiveWorkerIterCounter: prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: m.Namespace, - Subsystem: subsystem, - Name: "live_worker_iterations", - Help: "Total live worker iterations.", - }), - LiveWorkerErrCounter: prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: m.Namespace, - Subsystem: subsystem, - Name: "live_worker_errors", - Help: "Total live worker errors.", - }), MaxUintErrCounter: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: m.Namespace, Subsystem: subsystem, Name: "max_uint_errors", Help: "Total max uint errors.", }), - HistSyncTimeout: prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: m.Namespace, - Subsystem: subsystem, - Name: "hist_sync_timeout", - Help: "Count of timeouts of historical sync request.", - }), } } diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index c8c812cf86f..bedda5de596 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -35,12 +35,8 @@ var errCursorsLength = errors.New("cursors length mismatch") const ( DefaultHistRateWindow = time.Minute * 15 - intervalPrefix = "sync_interval" - recalcPeersDur = time.Minute * 5 - histSyncTimeout = time.Minute * 10 // this timeout should always be higher than pullsync.makeOfferTimeout - histSyncTimeoutBlockList = time.Hour - - maxHistSyncs = swarm.MaxBins * 3 + intervalPrefix = "sync_interval" + recalcPeersDur = time.Minute * 5 ) type Options struct { @@ -67,9 +63,8 @@ type Puller struct { bins uint8 // how many bins do we support - histSync *atomic.Uint64 // current number of gorourines doing historical syncing - histSyncLimiter chan struct{} // historical syncing limiter - rate *rate.Rate // rate of historical syncing + sync *atomic.Uint64 // current number of gorourines doing historical syncing + rate *rate.Rate // rate of historical syncing start sync.Once } @@ -82,28 +77,24 @@ func New( blockLister p2p.Blocklister, logger log.Logger, o Options, - warmupTime time.Duration, ) *Puller { - var ( - bins uint8 = swarm.MaxBins - ) + bins := swarm.MaxBins if o.Bins != 0 { bins = o.Bins } p := &Puller{ - statestore: stateStore, - topology: topology, - radius: reserveState, - syncer: pullSync, - metrics: newMetrics(), - logger: logger.WithName(loggerName).Register(), - syncPeers: make(map[string]*syncPeer), - bins: bins, - histSync: atomic.NewUint64(0), - blockLister: blockLister, - histSyncLimiter: make(chan struct{}, maxHistSyncs), - rate: rate.New(DefaultHistRateWindow), - cancel: func() { /* Noop, since the context is initialized in the Start(). */ }, + statestore: stateStore, + topology: topology, + radius: reserveState, + syncer: pullSync, + metrics: newMetrics(), + logger: logger.WithName(loggerName).Register(), + syncPeers: make(map[string]*syncPeer), + bins: bins, + sync: atomic.NewUint64(0), + blockLister: blockLister, + rate: rate.New(DefaultHistRateWindow), + cancel: func() { /* Noop, since the context is initialized in the Start(). */ }, } return p @@ -120,7 +111,7 @@ func (p *Puller) Start(ctx context.Context) { } func (p *Puller) ActiveHistoricalSyncing() uint64 { - return p.histSync.Load() + return p.sync.Load() } func (p *Puller) SyncRate() float64 { @@ -272,131 +263,64 @@ func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer, storageRadius uin func (p *Puller) syncPeerBin(ctx context.Context, peer *syncPeer, bin uint8, cur uint64) { binCtx, cancel := context.WithCancel(ctx) peer.setBinCancel(cancel, bin) - if cur > 0 { - p.wg.Add(1) - p.histSync.Inc() - go p.histSyncWorker(binCtx, peer.address, bin, cur) - } - // start live p.wg.Add(1) - go p.liveSyncWorker(binCtx, peer.address, bin, cur) + p.sync.Inc() + go p.syncWorker(binCtx, peer.address, bin, cur) } -func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uint8, cur uint64) { +func (p *Puller) syncWorker(ctx context.Context, peer swarm.Address, bin uint8, cur uint64) { loggerV2 := p.logger.V(2).Register() + p.metrics.SyncWorkerCounter.Inc() defer p.wg.Done() - defer p.metrics.HistWorkerDoneCounter.Inc() - defer p.histSync.Dec() - - loopStart := time.Now() - loggerV2.Debug("histSyncWorker starting", "peer_address", peer, "bin", bin, "cursor", cur) - - sync := func() bool { - s, _, _, err := p.nextPeerInterval(peer, bin) - if err != nil { - p.metrics.HistWorkerErrCounter.Inc() - p.logger.Error(err, "histSyncWorker nextPeerInterval failed, quitting...") - return true - } - if s > cur { - p.logger.Debug("histSyncWorker syncing finished", "bin", bin, "cursor", cur, "total_duration", time.Since(loopStart), "peer_address", peer) - return true - } - - syncStart := time.Now() - ctx, cancel := context.WithTimeout(ctx, histSyncTimeout) - top, count, err := p.syncer.Sync(ctx, peer, bin, s) - cancel() - - p.rate.Add(count) + defer p.metrics.SyncWorkerDoneCounter.Inc() + defer p.sync.Dec() - if top >= s { - if err := p.addPeerInterval(peer, bin, s, top); err != nil { - p.metrics.HistWorkerErrCounter.Inc() - p.logger.Error(err, "histSyncWorker could not persist interval for peer", "peer_address", peer) - return false - } - loggerV2.Debug("histSyncWorker pulled", "bin", bin, "start", s, "topmost", top, "duration", time.Since(syncStart), "peer_address", peer) - } - - if err != nil { - p.metrics.HistWorkerErrCounter.Inc() - p.logger.Debug("histSyncWorker interval failed", "peer_address", peer, "bin", bin, "cursor", cur, "start", s, "topmost", top, "err", err) - // DeadlineExceeded err could come from any other context timeouts - // so we explicitly check for duration of the sync time - if errors.Is(err, context.DeadlineExceeded) && time.Since(syncStart) >= histSyncTimeout { - p.logger.Debug("histSyncWorker interval timeout, exiting", "total_duration", time.Since(loopStart), "peer_address", peer, "error", err) - _ = p.blockLister.Blocklist(peer, histSyncTimeoutBlockList, "sync interval timeout") - p.metrics.HistSyncTimeout.Inc() - return true - } - if errors.Is(err, p2p.ErrPeerNotFound) { - return true - } - } - - return false - } + loggerV2.Debug("syncWorker starting", "peer_address", peer, "bin", bin, "cursor", cur) for { - p.metrics.HistWorkerIterCounter.Inc() select { case <-ctx.Done(): - loggerV2.Debug("histSyncWorker context cancelled", "peer_address", peer, "bin", bin, "cursor", cur) + loggerV2.Debug("syncWorker context cancelled", "peer_address", peer, "bin", bin) return - case p.histSyncLimiter <- struct{}{}: + default: } - stop := sync() - - <-p.histSyncLimiter - - if stop { - return - } - } -} - -func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uint8, cur uint64) { - loggerV2 := p.logger.V(2).Register() - - defer p.wg.Done() - loggerV2.Debug("liveSyncWorker starting", "peer_address", peer, "bin", bin, "cursor", cur) - from := cur + 1 - - for { - p.metrics.LiveWorkerIterCounter.Inc() + p.metrics.SyncWorkerIterCounter.Inc() - select { - case <-ctx.Done(): - loggerV2.Debug("liveSyncWorker context cancelled", "peer_address", peer, "bin", bin, "cursor", cur) + s, _, _, err := p.nextPeerInterval(peer, bin) + if err != nil { + p.metrics.SyncWorkerErrCounter.Inc() + p.logger.Error(err, "syncWorker nextPeerInterval failed, quitting...") return - default: } - top, _, err := p.syncer.Sync(ctx, peer, bin, from) + syncStart := time.Now() + top, count, err := p.syncer.Sync(ctx, peer, bin, s) if top == math.MaxUint64 { p.metrics.MaxUintErrCounter.Inc() - p.logger.Error(nil, "liveSyncWorker max uint64 encountered, quitting", "peer_address", peer, "bin", bin, "from", from, "topmost", top) + p.logger.Error(nil, "syncWorker max uint64 encountered, quitting", "peer_address", peer, "bin", bin, "from", s, "topmost", top) return } - if top >= from { - if err := p.addPeerInterval(peer, bin, from, top); err != nil { - p.metrics.LiveWorkerErrCounter.Inc() - p.logger.Error(err, "liveSyncWorker exit on add peer interval", "peer_address", peer, "bin", bin, "from", from, "error", err) - continue + if top <= cur { + p.rate.Add(count) + } + + if top >= s { + if err := p.addPeerInterval(peer, bin, s, top); err != nil { + p.metrics.SyncWorkerErrCounter.Inc() + p.logger.Error(err, "syncWorker could not persist interval for peer", "peer_address", peer) + return } - loggerV2.Debug("liveSyncWorker pulled bin", "bin", bin, "from", from, "topmost", top, "peer_address", peer) - from = top + 1 + loggerV2.Debug("syncWorker pulled", "bin", bin, "start", s, "topmost", top, "duration", time.Since(syncStart), "peer_address", peer) } if err != nil { - p.metrics.LiveWorkerErrCounter.Inc() - p.logger.Debug("liveSyncWorker sync error", "peer_address", peer, "bin", bin, "from", from, "topmost", top, "err", err) + p.metrics.SyncWorkerErrCounter.Inc() + p.logger.Debug("syncWorker interval failed", "peer_address", peer, "bin", bin, "cursor", cur, "start", s, "topmost", top, "err", err) if errors.Is(err, p2p.ErrPeerNotFound) { return } diff --git a/pkg/puller/puller_test.go b/pkg/puller/puller_test.go index 07d7a948b1d..0ed267d00d0 100644 --- a/pkg/puller/puller_test.go +++ b/pkg/puller/puller_test.go @@ -33,7 +33,7 @@ func TestOneSync(t *testing.T) { cursors = []uint64{1000, 1000, 1000} replies = []mockps.SyncReply{ {Bin: 1, Start: 1, Topmost: 1000, Peer: addr}, - {Bin: 2, Start: 1001, Topmost: 1001, Peer: addr}} + {Bin: 2, Start: 1, Topmost: 1001, Peer: addr}} ) _, _, kad, pullsync := newPuller(t, opts{ @@ -52,7 +52,6 @@ func TestOneSync(t *testing.T) { waitCursorsCalled(t, pullsync, addr) waitSyncCalledBins(t, pullsync, addr, 1, 2) - waitSync(t, pullsync, addr) } func TestSyncOutsideDepth(t *testing.T) { @@ -92,7 +91,6 @@ func TestSyncOutsideDepth(t *testing.T) { func TestSyncIntervals(t *testing.T) { t.Parallel() - t.Skip("skip until we have a better way to test this") addr := swarm.RandAddress(t) @@ -536,7 +534,7 @@ func newPuller(t *testing.T, ops opts) (*puller.Puller, storage.StateStorer, *ka o := puller.Options{ Bins: ops.bins, } - p := puller.New(s, kad, ops.rs, ps, nil, logger, o, 0) + p := puller.New(s, kad, ops.rs, ps, nil, logger, o) p.Start(context.Background()) testutil.CleanupCloser(t, p)