Skip to content

Commit

Permalink
Make bootstrapping handle its own timeouts
Browse files Browse the repository at this point in the history
Currently, an engine registers timeouts into the handler, which schedules the timeouts on behalf of the the engine.
The handler then notifies the engine when the timeout expired.

However, the only engine that uses this mechanism is the bootstrapping engine, and not the other engine types such as the snowman and state sync engines.

It therefore makes sense to consolidate the timeout handling instead of delegating them to the handler.

By moving the timeout handling closer to the bootstrapper, we can make the API of the common.Engine be slimmer by removing the Timeout() method from it.

Signed-off-by: Yacov Manevich <[email protected]>
  • Loading branch information
yacovm committed Sep 25, 2024
1 parent 1b6288f commit c25a68d
Show file tree
Hide file tree
Showing 18 changed files with 185 additions and 135 deletions.
2 changes: 0 additions & 2 deletions chains/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -962,7 +962,6 @@ func (m *manager) createAvalancheChain(
StartupTracker: startupTracker,
Sender: snowmanMessageSender,
BootstrapTracker: sb,
Timer: h,
PeerTracker: peerTracker,
AncestorsMaxContainersReceived: m.BootstrapAncestorsMaxContainersReceived,
DB: blockBootstrappingDB,
Expand Down Expand Up @@ -1357,7 +1356,6 @@ func (m *manager) createSnowmanChain(
StartupTracker: startupTracker,
Sender: messageSender,
BootstrapTracker: sb,
Timer: h,
PeerTracker: peerTracker,
AncestorsMaxContainersReceived: m.BootstrapAncestorsMaxContainersReceived,
DB: bootstrappingDB,
Expand Down
18 changes: 0 additions & 18 deletions message/internal_msg_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
var (
disconnected = &Disconnected{}
gossipRequest = &GossipRequest{}
timeout = &Timeout{}

_ fmt.Stringer = (*GetStateSummaryFrontierFailed)(nil)
_ chainIDGetter = (*GetStateSummaryFrontierFailed)(nil)
Expand Down Expand Up @@ -50,8 +49,6 @@ var (
_ fmt.Stringer = (*Disconnected)(nil)

_ fmt.Stringer = (*GossipRequest)(nil)

_ fmt.Stringer = (*Timeout)(nil)
)

type GetStateSummaryFrontierFailed struct {
Expand Down Expand Up @@ -391,18 +388,3 @@ func InternalGossipRequest(
expiration: mockable.MaxTime,
}
}

type Timeout struct{}

func (Timeout) String() string {
return ""
}

func InternalTimeout(nodeID ids.NodeID) InboundMessage {
return &inboundMessage{
nodeID: nodeID,
op: TimeoutOp,
message: timeout,
expiration: mockable.MaxTime,
}
}
5 changes: 3 additions & 2 deletions snow/engine/common/bootstrap_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ type BootstrapTracker interface {
// Returns true iff done bootstrapping
IsBootstrapped() bool

// Bootstrapped marks the named chain as being bootstrapped
// Bootstrapped marks the named chain as being bootstrapped.
Bootstrapped(chainID ids.ID)

OnBootstrapCompleted() chan struct{}
// AllBootstrapped returns a channel that is closed when all chains have been bootstrapped.
AllBootstrapped() <-chan struct{}
}
3 changes: 0 additions & 3 deletions snow/engine/common/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,9 +415,6 @@ type InternalHandler interface {
// Notify this engine of peer changes.
validators.Connector

// Notify this engine that a registered timeout has fired.
Timeout(context.Context) error

// Gossip to the network a container on the accepted frontier
Gossip(context.Context) error

Expand Down
83 changes: 79 additions & 4 deletions snow/engine/common/timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,87 @@

package common

import "time"
import (
"sync"
"time"
)

// Timer describes the standard interface for specifying a timeout
type Timer interface {
// PreemptionSignal signals when to preempt the timeouts of the timeout handler.
type PreemptionSignal struct {
activateOnce sync.Once
initOnce sync.Once
signal chan struct{}
}

func (ps *PreemptionSignal) init() {
ps.signal = make(chan struct{})
}

// Listen returns a read-only channel that is closed when Preempt() is invoked.
func (ps *PreemptionSignal) Listen() <-chan struct{} {
ps.initOnce.Do(ps.init)
return ps.signal
}

// Preempt causes any past and future call of Listen return a closed channel.
func (ps *PreemptionSignal) Preempt() {
ps.initOnce.Do(ps.init)
ps.activateOnce.Do(func() {
close(ps.signal)
})
}

type timeoutHandler struct {
newTimer func(time.Duration) (<-chan time.Time, func() bool)
timeouts chan struct{}
onTimeout func()
preemptionSignal <-chan struct{}
}

// NewTimeoutHandler constructs a new timeout handler with the given function to be invoked upon a timeout,
// unless the preemptionSignal is closed and in which case it invokes the function immediately.
func NewTimeoutHandler(onTimeout func(), preemptionSignal <-chan struct{}) *timeoutHandler {
timeoutBuff := make(chan struct{}, 1)
return &timeoutHandler{
preemptionSignal: preemptionSignal,
newTimer: func(d time.Duration) (<-chan time.Time, func() bool) {
timer := time.NewTimer(d)
return timer.C, timer.Stop
},
timeouts: timeoutBuff,
onTimeout: func() {
onTimeout()
<-timeoutBuff
},
}
}

// RegisterTimeout fires the function the timeout handler is initialized with no later than the given timeout.
func (th *timeoutHandler) RegisterTimeout(d time.Duration) {
go func() {
timeChan, stop := th.newTimer(d)
defer stop()

select {
case <-timeChan:
case <-th.preemptionSignal:
}

// If there is already a timeout ready to fire - just drop the
// additional timeout. This ensures that all goroutines that are spawned
// here are able to close if the chain is shutdown.
select {
case th.timeouts <- struct{}{}:
th.onTimeout()
default:
}
}()
}

// TimeoutRegistry describes the standard interface for specifying a timeout
type TimeoutRegistry interface {
// RegisterTimeout specifies how much time to delay the next timeout message
// by. If the subnet has been bootstrapped, the timeout will fire
// immediately.
// immediately via calling Preempt().
RegisterTimeout(time.Duration)
}
60 changes: 60 additions & 0 deletions snow/engine/common/timer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package common

import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestTimeoutHandler(t *testing.T) {
for _, testCase := range []struct {
desc string
clock func(time.Duration) (<-chan time.Time, func() bool)
body func(*timeoutHandler, *sync.WaitGroup, *PreemptionSignal)
}{
{
desc: "Function invoked upon timeout",
clock: func(d time.Duration) (<-chan time.Time, func() bool) {
timer := time.NewTimer(d)
return timer.C, timer.Stop
},
body: func(th *timeoutHandler, wg *sync.WaitGroup, _ *PreemptionSignal) {
th.RegisterTimeout(time.Millisecond)
wg.Wait()
},
},
{
desc: "Preemption makes the function be invoked immediately despite no clock tick",
body: func(th *timeoutHandler, wg *sync.WaitGroup, signal *PreemptionSignal) {
th.RegisterTimeout(time.Hour)
signal.Preempt()
wg.Wait()
},
},
} {
t.Run(testCase.desc, func(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)

var invoked int

var preemptionsignal PreemptionSignal

th := NewTimeoutHandler(func() {
invoked++
wg.Done()
}, preemptionsignal.Listen())
if testCase.clock != nil {
th.newTimer = testCase.clock
}

testCase.body(th, &wg, &preemptionsignal)
require.Equal(t, 1, invoked)
})
}
}
7 changes: 0 additions & 7 deletions snow/engine/common/traced_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,13 +329,6 @@ func (e *tracedEngine) Disconnected(ctx context.Context, nodeID ids.NodeID) erro
return e.engine.Disconnected(ctx, nodeID)
}

func (e *tracedEngine) Timeout(ctx context.Context) error {
ctx, span := e.tracer.Start(ctx, "tracedEngine.Timeout")
defer span.End()

return e.engine.Timeout(ctx)
}

func (e *tracedEngine) Gossip(ctx context.Context) error {
ctx, span := e.tracer.Start(ctx, "tracedEngine.Gossip")
defer span.End()
Expand Down
2 changes: 1 addition & 1 deletion snow/engine/enginetest/bootstrap_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (s *BootstrapTracker) Bootstrapped(chainID ids.ID) {
}
}

func (s *BootstrapTracker) OnBootstrapCompleted() chan struct{} {
func (s *BootstrapTracker) AllBootstrapped() <-chan struct{} {
if s.OnBootstrapCompletedF != nil {
return s.OnBootstrapCompletedF()
} else if s.CantOnBootstrapCompleted && s.T != nil {
Expand Down
14 changes: 0 additions & 14 deletions snow/engine/enginetest/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
)

var (
errTimeout = errors.New("unexpectedly called Timeout")
errGossip = errors.New("unexpectedly called Gossip")
errNotify = errors.New("unexpectedly called Notify")
errGetStateSummaryFrontier = errors.New("unexpectedly called GetStateSummaryFrontier")
Expand Down Expand Up @@ -189,19 +188,6 @@ func (e *Engine) Start(ctx context.Context, startReqID uint32) error {
return errStart
}

func (e *Engine) Timeout(ctx context.Context) error {
if e.TimeoutF != nil {
return e.TimeoutF(ctx)
}
if !e.CantTimeout {
return nil
}
if e.T != nil {
require.FailNow(e.T, errTimeout.Error())
}
return errTimeout
}

func (e *Engine) Gossip(ctx context.Context) error {
if e.GossipF != nil {
return e.GossipF(ctx)
Expand Down
9 changes: 0 additions & 9 deletions snow/engine/enginetest/timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,8 @@ import (
"time"

"github.com/stretchr/testify/require"

"github.com/ava-labs/avalanchego/snow/engine/common"
)

var _ common.Timer = (*Timer)(nil)

// Timer is a test timer
type Timer struct {
T *testing.T
Expand All @@ -23,11 +19,6 @@ type Timer struct {
RegisterTimeoutF func(time.Duration)
}

// Default set the default callable value to [cant]
func (t *Timer) Default(cant bool) {
t.CantRegisterTimout = cant
}

func (t *Timer) RegisterTimeout(delay time.Duration) {
if t.RegisterTimeoutF != nil {
t.RegisterTimeoutF(delay)
Expand Down
23 changes: 16 additions & 7 deletions snow/engine/snowman/bootstrap/bootstrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type Bootstrapper struct {
Config
shouldHalt func() bool
*metrics

TimeoutRegistry common.TimeoutRegistry
// list of NoOpsHandler for messages dropped by bootstrapper
common.StateSummaryFrontierHandler
common.AcceptedStateSummaryHandler
Expand Down Expand Up @@ -119,7 +119,7 @@ type Bootstrapper struct {

func New(config Config, onFinished func(ctx context.Context, lastReqID uint32) error) (*Bootstrapper, error) {
metrics, err := newMetrics(config.Ctx.Registerer)
return &Bootstrapper{
bs := &Bootstrapper{
shouldHalt: config.ShouldHalt,
nonVerifyingParser: config.NonVerifyingParse,
Config: config,
Expand All @@ -139,7 +139,16 @@ func New(config Config, onFinished func(ctx context.Context, lastReqID uint32) e

executedStateTransitions: math.MaxInt,
onFinished: onFinished,
}, err
}

timeout := func() {
if err := bs.Timeout(); err != nil {
bs.Config.Ctx.Log.Warn("Encountered error during bootstrapping: %w", zap.Error(err))
}
}
bs.TimeoutRegistry = common.NewTimeoutHandler(timeout, config.BootstrapTracker.AllBootstrapped())

return bs, err
}

func (b *Bootstrapper) Context() *snow.ConsensusContext {
Expand Down Expand Up @@ -703,8 +712,8 @@ func (b *Bootstrapper) tryStartExecuting(ctx context.Context) error {
log("waiting for the remaining chains in this subnet to finish syncing")
// Restart bootstrapping after [bootstrappingDelay] to keep up to date
// on the latest tip.
b.Config.Timer.RegisterTimeout(bootstrappingDelay)
b.awaitingTimeout = true
b.TimeoutRegistry.RegisterTimeout(bootstrappingDelay)
return nil
}
return b.onFinished(ctx, b.requestID)
Expand All @@ -722,16 +731,16 @@ func (b *Bootstrapper) getLastAccepted(ctx context.Context) (snowman.Block, erro
return lastAccepted, nil
}

func (b *Bootstrapper) Timeout(ctx context.Context) error {
func (b *Bootstrapper) Timeout() error {
if !b.awaitingTimeout {
return errUnexpectedTimeout
}
b.awaitingTimeout = false

if !b.Config.BootstrapTracker.IsBootstrapped() {
return b.restartBootstrapping(ctx)
return b.restartBootstrapping(context.TODO())
}
return b.onFinished(ctx, b.requestID)
return b.onFinished(context.TODO(), b.requestID)
}

func (b *Bootstrapper) restartBootstrapping(ctx context.Context) error {
Expand Down
Loading

0 comments on commit c25a68d

Please sign in to comment.