-
Notifications
You must be signed in to change notification settings - Fork 89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Generate events when nodes are approved/rejected #3772
base: main
Are you sure you want to change the base?
Conversation
Important Auto Review SkippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the To trigger a single review, invoke the TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
pkg/node/manager/events.go
Outdated
case <-completedChan: | ||
return nil | ||
case <-ctx.Done(): | ||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably worth returning ctx.Err()
here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in ff7828f
pkg/node/manager/events.go
Outdated
for { | ||
select { | ||
case <-completedChan: | ||
return nil | ||
case <-ctx.Done(): | ||
return nil | ||
case <-e.clock.After(waitDuration): | ||
return fmt.Errorf("timed out waiting for %s event callbacks to complete", event.String()) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can remove this for loop since select
will block until one of its cases can run.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in ff7828f, not sure why it was there in the first place, probably force of habit.
for _, hlr := range e.callbacks { | ||
wg.Add(1) | ||
go func(handler NodeEventHandler) { | ||
handler.HandleNodeEvent(ctx, info, event) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am thinking it's worth passing a sub-context with a timeout to these methods instead of the parent context. This way we ensure that all the handlers receive the cancellation signal and can stop their execution, and (try to) prevent goroutine leaks on timeout. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, will give it a try. The problem might be that the timeout will not be using the mock clock, and so is only really useful for cleaning up unfinished callbacks rather than being useful to replace the timeout on each callback.
Cancellable version in 1da8eb2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem might be that the timeout will not be using the mock clock, and so is only really useful for cleaning up unfinished callbacks rather than being useful to replace the timeout on each callback
oo yeah, good point. Change looks good here - thanks.
pkg/node/manager/events.go
Outdated
wg.Wait() | ||
}() | ||
|
||
waitDuration := 1 * time.Second |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this a field on the NodeEventEmitter
? It will be easier to change in the event we need to adjust this value later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in ff7828f
pkg/node/manager/interfaces.go
Outdated
type NodeEventHandler interface { | ||
HandleNodeEvent(ctx context.Context, info models.NodeInfo, event NodeEvent) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would find the code slightly more readable if we declared this interface where it's expected/used before the RegisterHandler
method on the NodeEventEmitter
instead of in a separate file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reasonble, done in ff7828f. I think I put it in an interfaces.go as there's been a bit of a pattern with the eval stuff that there's an interfaces.go and a types.go.
@@ -815,3 +816,10 @@ func (b *InMemoryBroker) registerMetrics() (metric.Registration, error) { | |||
}, orchestrator.EvalBrokerReady, orchestrator.EvalBrokerInflight, orchestrator.EvalBrokerPending, | |||
orchestrator.EvalBrokerWaiting, orchestrator.EvalBrokerCancelable) | |||
} | |||
|
|||
func (b *InMemoryBroker) HandleNodeEvent(ctx context.Context, info models.NodeInfo, evt manager.NodeEvent) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For readability add a compile time assertion to the top of this file:
var _ manager.NodeEventHandler = &InMemoryBroker{}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in ff7828f
@@ -69,7 +69,6 @@ func (t liveness) IsValid() bool { | |||
type livenessContainer struct { | |||
CONNECTED NodeState | |||
DISCONNECTED NodeState | |||
HEALTHY NodeState |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CONNECTED
now implies healthy I assume? or was this just never used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was never used and I think only worked because the default for ints is 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct "healthy" implied something that we don't know about the node. "Connected" is the right term (for now)
// WithClock is an option that can be used to set the clock for the NodeEventEmitter. This is useful | ||
// for testing purposes. | ||
func WithClock(clock clock.Clock) NodeEventEmitterOption { | ||
return func(emitter *NodeEventEmitter) { | ||
emitter.clock = clock | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure this is used anywhere, was it meant to be used in a test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was, and is with b9a6953
go func() { | ||
s.clock.Add(10 * time.Second) | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why does this need a go routine? I'd assume adding 10 seconds to the clock is non-blocking?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some weirdness with the mock clock where it won't work when it's inline. I've been trying to work out if I'm just holding it wrong but I suspect it's something to do with the work in EmitEvent happening in a goroutine and requiring a yield (but I'm not 100% sure)
@rossjones I have implemented a fix for the NodeInfo overwrite bug here: #3785 |
We want to deliver events to a component that is able to use the job store and the events delivered to interact with an evaluation broker. This comment implements that event listener.
@frrist are you owning this PR now? what do we need to do to unblock it and merge it? |
@wdbaruni yeah I can take this over. |
When nodes are approved or rejected, or connected/disconnected, the system may want to respond to these events by creating new evaluations, or cancelling previous tasks.
This PR adds an event emitter which allows implementations of
NodeEventHandler
to receive events that are one of:all events are delivered in a goroutine, and so it is the recipients responsibility to handle the event quickly, or to create a new goroutine and return immediately.
Currently this PR delivers events to an event listener in the
manager
package which currently logs that it has received and event. In future, we will want to modify this behaviour to create new evaluations, or stop current executions based on the node info that is delivered with the event, the job types currently deployed, and the type of event. To enable this, the NodeEventListener is provided with a reference to the evaluation broker and the jobstore.