-
Notifications
You must be signed in to change notification settings - Fork 671
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make bootstrapping handle its own timeouts #3410
base: master
Are you sure you want to change the base?
Conversation
e2641d7
to
09ad764
Compare
25122c6
to
b54e285
Compare
31d051e
to
c25a68d
Compare
snow/engine/common/timer.go
Outdated
} | ||
|
||
// RegisterTimeout fires the function the timeout handler is initialized with no later than the given timeout. | ||
func (th *timeoutHandler) RegisterTimeout(d time.Duration) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just moved this code section from handler.go
snow/engine/common/timer.go
Outdated
newTimer: func(d time.Duration) (<-chan time.Time, func() bool) { | ||
timer := time.NewTimer(d) | ||
return timer.C, timer.Stop | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the reason for specifying this function here rather than just using time.NewTimer
directly?
It looks like we replace this with an identical implementation in one of our test cases
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because I needed it for the tests :-)
snow/engine/common/timer_test.go
Outdated
}, | ||
}, | ||
} { | ||
t.Run(testCase.desc, func(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to remove clock
from these tests?
What do you think of adding test cases for multiple timeouts w/ and w/o preemption?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added two tests as requested, but I need the clock for these tests.
9be93a5
to
fe44d95
Compare
@@ -14,5 +14,6 @@ type BootstrapTracker interface { | |||
// Bootstrapped marks the named chain as being bootstrapped | |||
Bootstrapped(chainID ids.ID) | |||
|
|||
OnBootstrapCompleted() chan struct{} | |||
// AllBootstrapped returns a channel that is closed when all chains have been bootstrapped |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// AllBootstrapped returns a channel that is closed when all chains have been bootstrapped | |
// AllBootstrapped returns a channel that is closed when all chains in this | |
// subnet have been bootstrapped |
snow/engine/common/timer.go
Outdated
return ps.signal | ||
} | ||
|
||
// Preempt causes any past and future call of Listen return a closed channel. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Preempt causes any past and future call of Listen return a closed channel. | |
// Preempt causes any past and future calls of Listen to return a closed channel. |
snow/engine/common/timer.go
Outdated
onTimeout: func() { | ||
onTimeout() | ||
<-timeoutBuff | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we wrap onTimeout
rather than just draining the channel when we call this function? The code that executes this function is already managing the channel right?
snow/engine/common/timer.go
Outdated
type timeoutHandler struct { | ||
newTimer func(time.Duration) (<-chan time.Time, func() bool) | ||
timeouts chan struct{} | ||
onTimeout func() | ||
preemptionSignal <-chan struct{} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been looking at the timeoutHandler
for the past 10m
and I'm still not really sure what this is doing.
Is all this complexity really needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made it this way because I didn't want to change the original code pattern, and to make as minimal changes as possible.
I now re-implemented it. I hope it reads better and is easier to reason about.
snow/engine/common/timer.go
Outdated
// If there is already a timeout ready to fire - just drop the | ||
// additional timeout. This ensures that all goroutines that are spawned | ||
// here are able to close if the chain is shutdown. | ||
select { | ||
case th.timeouts <- struct{}{}: | ||
th.onTimeout() | ||
default: | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels racy to me. What are the expected guarantees around onTimeout
being called?
As this is currently written, it seems like a caller may call RegisterTimeout
and then never see an onTimeout
called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There should be at least a single single timeout called, as th.timeouts
is initialized to a buffered channel with a capacity of one.
If two are called concurrently, then there is an attempt to only call one.
This code was not made in this PR, I just moved it around.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I re-implemented this part, and added a lock on the context lock in the timeout callback
timeout := func() { | ||
if err := bs.Timeout(); err != nil { | ||
bs.Config.Ctx.Log.Warn("Encountered error during bootstrapping: %w", zap.Error(err)) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is going to be called in a separate goroutine without the context lock held. Right? I think that can cause some racy behavior
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, you're saying that the current code synchronizes all the calls to Timeout and calls them via the handler, while this new code calls each function in its own goroutine, is that it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm now locking the context lock in the timeout callback. I am not thrilled about it, but since we already have a few dozens of places where we lock this context lock, i figured it makes sense to proceed than to revert and have a tighter coupling between the engine and the handler via the timeout feedback loop.
2d6f9b4
to
4f949b8
Compare
Currently, an engine registers timeouts into the handler, which schedules the timeouts on behalf of the the engine. The handler then notifies the engine when the timeout expired. However, the only engine that uses this mechanism is the bootstrapping engine, and not the other engine types such as the snowman and state sync engines. It therefore makes sense to consolidate the timeout handling instead of delegating them to the handler. By moving the timeout handling closer to the bootstrapper, we can make the API of the common.Engine be slimmer by removing the Timeout() method from it. Signed-off-by: Yacov Manevich <[email protected]>
4f949b8
to
bdf0063
Compare
snow/engine/common/timer.go
Outdated
if !preempted && !acquiredToken { | ||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is there special handling around the preemption here? Specifically, I'm not sure why we allow multiple goroutines to be scheduled if we have been preempted. I think just having:
if !th.acquirePendingTimeoutToken() {
return
}
should be sufficient for our expected invariants.
If we did this, I think we could get rid of the acquiredToken
argument in scheduleTimeout
and remove the preempted
helper entirely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the behavior of the previous code scheduled a new goroutine regardless.
The trick in the bottom select doesn't work in most cases as it's very time reliant - it schedules a goroutine regardless, and then attempts to "cancel it" after the timeout.
func (h *handler) RegisterTimeout(d time.Duration) {
go func() {
timer := time.NewTimer(d)
defer timer.Stop()
select {
case <-timer.C:
case <-h.preemptTimeouts:
}
// If there is already a timeout ready to fire - just drop the
// additional timeout. This ensures that all goroutines that are spawned
// here are able to close if the chain is shutdown.
select {
case h.timeouts <- struct{}{}:
default:
}
}()
}
I agree though I should probably take advantage of the opportunity and not replicate a pattern if it does not add any value.
timer := th.newTimer(d) | ||
defer timer.Stop() | ||
|
||
defer th.onTimeout() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I feel like this makes a bit more sense to just be called inline at the end of the function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inlined
snow/engine/common/timer.go
Outdated
} | ||
|
||
if acquiredToken { | ||
th.relinquishPendingTimeoutToken() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we comment why it is important to return the token prior to calling onTimeout
?
// Allow the next timeout to be scheduled. It is important that this is
// performed prior to invoking the timeout handler to allow the timeout
// handler (or another goroutine synchronized with the timeout handler) to
// schedule a future timeout and be guaranteed it will fire.
or something
snow/engine/common/timer.go
Outdated
// RegisterTimeout specifies how much time to delay the next timeout message | ||
// by. If the subnet has been bootstrapped, the timeout will fire | ||
// immediately. | ||
// immediately via calling Preempt(). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While we are here, it probably makes sense to expand on the interface comment a bit:
// RegisterTimeout specifies how much time to delay the next timeout message
// by.
//
// If there is already a pending timeout message, this call is a no-op.
// However, it is guaranteed that the timeout will fire at least once after
// calling this function.
//
// If the subnet has been bootstrapped, the timeout will fire immediately
// via calling Preempt().
snow/engine/common/timer.go
Outdated
pendingTimout := make(chan struct{}, 1) | ||
pendingTimout <- struct{}{} | ||
return &timeoutScheduler{ | ||
preemptionSignal: preemptionSignal, | ||
newTimer: newTimer, | ||
onTimeout: onTimeout, | ||
pendingTimeout: pendingTimout, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🙏 I think this is much cleaner than before - ty
snow/engine/common/timer.go
Outdated
|
||
// NewTimeoutScheduler constructs a new timeout scheduler with the given function to be invoked upon a timeout, | ||
// unless the preemptionSignal is closed and in which case it invokes the function immediately. | ||
func NewTimeoutScheduler(onTimeout func(), preemptionSignal <-chan struct{}, newTimer func(duration time.Duration) *time.Timer) *timeoutScheduler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
q: since we only ever specify newTimer
manually in tests in this package, would it make sense to remove this from the signature (initializing to time.NewTimer
in this constructer) and then override the newTimer
variable in the tests?
It seems like that would give a nicer UX for the prod code using this... But I don't have strong opinions here, just a small suggestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes makes sense. That was also how it was in the previous incarnation of this code.
snow/engine/common/timer_test.go
Outdated
newTimer := func(_ time.Duration) *time.Timer { | ||
// We use a duration of 0 to not leave a lingering timer | ||
// after the test finishes. | ||
// Then we replace the time channel to have control over the timer. | ||
timer := time.NewTimer(0) | ||
timer.C = clock | ||
return timer | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we make this a helper (since we define and use it twice?)
snow/engine/common/timer_test.go
Outdated
var wg sync.WaitGroup | ||
wg.Add(1) | ||
|
||
onTimeout := func() { | ||
wg.Done() | ||
} | ||
|
||
roChan := make(<-chan struct{}) | ||
|
||
ts := NewTimeoutScheduler(onTimeout, roChan, newTimer) | ||
|
||
ts.RegisterTimeout(time.Hour) // First timeout is registered | ||
ts.RegisterTimeout(time.Hour) // Second should not | ||
|
||
// Clock ticks are after registering, in order to ensure onTimeout() isn't fired until second registration is invoked. | ||
clock <- time.Now() | ||
clock <- time.Now() | ||
|
||
wg.Wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I wasn't sure what
roChan
meant, felt likepreemptChan
was a more clear name. - We can just pass
wg.Done
rather than wrapping it into another function.
var wg sync.WaitGroup | |
wg.Add(1) | |
onTimeout := func() { | |
wg.Done() | |
} | |
roChan := make(<-chan struct{}) | |
ts := NewTimeoutScheduler(onTimeout, roChan, newTimer) | |
ts.RegisterTimeout(time.Hour) // First timeout is registered | |
ts.RegisterTimeout(time.Hour) // Second should not | |
// Clock ticks are after registering, in order to ensure onTimeout() isn't fired until second registration is invoked. | |
clock <- time.Now() | |
clock <- time.Now() | |
wg.Wait() | |
var wg sync.WaitGroup | |
wg.Add(1) | |
preemptChan := make(<-chan struct{}) | |
ts := NewTimeoutScheduler(wg.Done, preemptChan, newTimer) | |
ts.RegisterTimeout(time.Hour) // First timeout is registered | |
ts.RegisterTimeout(time.Hour) // Second should not | |
// Clock ticks are after registering, in order to ensure wg.Done() isn't fired until second registration is invoked. | |
clock <- time.Now() | |
clock <- time.Now() | |
wg.Wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
roChan
means read-only channel. Agree preemptChan
is better.
} | ||
} | ||
|
||
func TestTimeoutSchedulerConcurrentRegister(_ *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we comment that this test relies on wg.Done
to panic if the counter goes negative? Typically a test that don't use the *testing.T
isn't really a test (as it can't fail normally).
snow/engine/common/timer_test.go
Outdated
initClock: func(_ chan time.Time) {}, | ||
advanceTime: func(_ chan time.Time) {}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super nit, but typically we don't put the _
if we don't need to.
initClock: func(_ chan time.Time) {}, | |
advanceTime: func(_ chan time.Time) {}, | |
initClock: func(chan time.Time) {}, | |
advanceTime: func(chan time.Time) {}, |
Signed-off-by: Yacov Manevich <[email protected]>
214ed68
to
a7b3ab4
Compare
@StephenButtolph thanks for the review, addressed your comments. |
Why this should be merged
Currently, an engine registers timeouts into the handler, which schedules the timeouts on behalf of the the engine.
The handler then notifies the engine when the timeout expired.
However, the only engine that uses this mechanism is the bootstrapping engine, and not the other engine types such as the snowman and state sync engines.
It therefore makes sense to consolidate the timeout handling instead of delegating them to the handler.
By moving the timeout handling closer to the bootstrapper, we can make the API of the common.Engine be slimmer by removing the Timeout() method from it.
How this works
I refactored the timeout logic and decoupled it from the handler.
I also re-implemented the timeout mechanism in order to be more robust and testable.
How this was tested
The node successfully finished bootstrapping and there were no data races observed in its log.