Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generate events when nodes are approved/rejected #3772

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
1 change: 0 additions & 1 deletion pkg/models/node_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ func (t liveness) IsValid() bool {
type livenessContainer struct {
CONNECTED NodeState
DISCONNECTED NodeState
HEALTHY NodeState
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CONNECTED now implies healthy I assume? or was this just never used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was never used and I think only worked because the default for ints is 0.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct "healthy" implied something that we don't know about the node. "Connected" is the right term (for now)

}

var NodeStates = livenessContainer{
Expand Down
35 changes: 31 additions & 4 deletions pkg/node/heartbeat/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/pubsub"
)

type NodeConnectionStateCallback func(ctx context.Context, nodeID string)

type HeartbeatServerParams struct {
Client *nats.Conn
Topic string
Expand All @@ -30,6 +32,8 @@ type HeartbeatServer struct {
livenessMap *concurrency.StripedMap[models.NodeState]
checkFrequency time.Duration
disconnectedAfter time.Duration
onConnected NodeConnectionStateCallback
onDisconnected NodeConnectionStateCallback
}

type TimestampedHeartbeat struct {
Expand Down Expand Up @@ -70,6 +74,13 @@ func NewServer(params HeartbeatServerParams) (*HeartbeatServer, error) {
}, nil
}

func (h *HeartbeatServer) AttachCallbacks(
onConnected NodeConnectionStateCallback,
onDisconnected NodeConnectionStateCallback) {
h.onConnected = onConnected
h.onDisconnected = onDisconnected
}

func (h *HeartbeatServer) Start(ctx context.Context) error {
if err := h.subscription.Subscribe(ctx, h); err != nil {
return err
Expand Down Expand Up @@ -128,15 +139,31 @@ func (h *HeartbeatServer) CheckQueue(ctx context.Context) {
}

if item.Value.Timestamp < disconnectedUnder {
h.markNodeAs(item.Value.NodeID, models.NodeStates.DISCONNECTED)
h.markNodeAs(ctx, item.Value.NodeID, models.NodeStates.DISCONNECTED)
}
}
}

// markNode will mark a node as being in a certain state. This will be used to update the node's
// info to include the liveness state.
func (h *HeartbeatServer) markNodeAs(nodeID string, state models.NodeState) {
func (h *HeartbeatServer) markNodeAs(ctx context.Context, nodeID string, state models.NodeState) {
// Get the current state (if any for this node)
current, found := h.livenessMap.Get(nodeID)
if found && current == state {
return
}

h.livenessMap.Put(nodeID, state)

if state == models.NodeStates.CONNECTED {
if h.onConnected != nil {
h.onConnected(ctx, nodeID)
}
} else if state == models.NodeStates.DISCONNECTED {
if h.onDisconnected != nil {
h.onDisconnected(ctx, nodeID)
}
}
}

// UpdateNode will add the liveness for specific nodes to their NodeInfo
Expand Down Expand Up @@ -173,6 +200,8 @@ func (h *HeartbeatServer) Handle(ctx context.Context, message Heartbeat) error {

timestamp := h.clock.Now().UTC().Unix()

h.markNodeAs(ctx, message.NodeID, models.NodeStates.CONNECTED)

if h.pqueue.Contains(message.NodeID) {
// If we think we already have a heartbeat from this node, we'll update the
// timestamp of the entry so it is re-prioritized in the queue by dequeuing
Expand All @@ -197,8 +226,6 @@ func (h *HeartbeatServer) Handle(ctx context.Context, message Heartbeat) error {
h.pqueue.Enqueue(TimestampedHeartbeat{Heartbeat: message, Timestamp: timestamp}, timestamp)
}

h.markNodeAs(message.NodeID, models.NodeStates.HEALTHY)

return nil
}

Expand Down
31 changes: 31 additions & 0 deletions pkg/node/manager/event_listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package manager

import (
"context"

"github.com/bacalhau-project/bacalhau/pkg/jobstore"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/orchestrator"
"github.com/rs/zerolog/log"
)

type NodeEventListener struct {
broker orchestrator.EvaluationBroker
jobstore jobstore.Store
}

func NewNodeEventListener(broker orchestrator.EvaluationBroker, jobstore jobstore.Store) *NodeEventListener {
return &NodeEventListener{
broker: broker,
jobstore: jobstore,
}
}

// HandleNodeEvent will receive events from the node manager, and is responsible for deciding what
// to do in response to those events. This NodeEventHandler implementation is expected to
// create new evaluations based on the events received.
func (n *NodeEventListener) HandleNodeEvent(ctx context.Context, info models.NodeInfo, evt NodeEvent) {
log.Ctx(ctx).Info().Msgf("Received node event %s for node %s", evt.String(), info.NodeID)
}

var _ NodeEventHandler = &NodeEventListener{}
127 changes: 127 additions & 0 deletions pkg/node/manager/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
//go:generate mockgen --source events.go --destination mocks.go --package manager
package manager

import (
"context"
"fmt"
"sync"
"time"

"github.com/benbjohnson/clock"

"github.com/bacalhau-project/bacalhau/pkg/models"
)

// NodeEvent represents the type of event that can be emitted by the NodeEventEmitter.
type NodeEvent int

const (
NodeEventApproved NodeEvent = iota
NodeEventRejected
NodeEventDeleted
NodeEventConnected
NodeEventDisconnected
)

func (n NodeEvent) String() string {
if n == NodeEventApproved {
return "NodeEventApproved"
} else if n == NodeEventRejected {
return "NodeEventRejected"
} else if n == NodeEventDeleted {
return "NodeEventDeleted"
} else if n == NodeEventConnected {
return "NodeEventConnected"
} else if n == NodeEventDisconnected {
return "NodeEventDisconnected"
}

return "UnknownNodeEvent"
}

type NodeEventEmitterOption func(emitter *NodeEventEmitter)

// WithClock is an option that can be used to set the clock for the NodeEventEmitter. This is useful
// for testing purposes.
func WithClock(clock clock.Clock) NodeEventEmitterOption {
return func(emitter *NodeEventEmitter) {
emitter.clock = clock
}
}
Comment on lines +44 to +50
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this is used anywhere, was it meant to be used in a test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was, and is with b9a6953


// NodeEventHandler defines the interface for components which wish to respond to node events
type NodeEventHandler interface {
HandleNodeEvent(ctx context.Context, info models.NodeInfo, event NodeEvent)
}

// NodeEventEmitter is a struct that will be used to emit events and register callbacks for those events.
// Events will be emitted by the node manager when a node is approved or rejected, and the expectation
// is that they will be consumed by the evaluation broker to create new evaluations.
// It is safe for concurrent use.
type NodeEventEmitter struct {
mu sync.Mutex
callbacks []NodeEventHandler
clock clock.Clock
emitTimeout time.Duration
}

func NewNodeEventEmitter(options ...NodeEventEmitterOption) *NodeEventEmitter {
emitter := &NodeEventEmitter{
callbacks: make([]NodeEventHandler, 0),
clock: clock.New(),
emitTimeout: 1 * time.Second,
}

for _, option := range options {
option(emitter)
}

return emitter
}

// RegisterCallback will register a callback for a specific event and add it to the list
// of existing callbacks, all of which will be called then that event is emitted.
func (e *NodeEventEmitter) RegisterHandler(callback NodeEventHandler) {
e.mu.Lock()
defer e.mu.Unlock()

e.callbacks = append(e.callbacks, callback)
}

// EmitEvent will emit an event and call all the callbacks registered for that event. These callbacks
// are called in a goroutine and are expected to complete quickly.
func (e *NodeEventEmitter) EmitEvent(ctx context.Context, info models.NodeInfo, event NodeEvent) error {
e.mu.Lock()
defer e.mu.Unlock()

completedChan := make(chan struct{})
wg := sync.WaitGroup{}

newCtx, cancel := context.WithCancel(ctx)
defer cancel()

for _, hlr := range e.callbacks {
wg.Add(1)
go func(handler NodeEventHandler, ctx context.Context) {
defer wg.Done()

handler.HandleNodeEvent(ctx, info, event)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking it's worth passing a sub-context with a timeout to these methods instead of the parent context. This way we ensure that all the handlers receive the cancellation signal and can stop their execution, and (try to) prevent goroutine leaks on timeout. What do you think?

Copy link
Contributor Author

@rossjones rossjones Apr 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, will give it a try. The problem might be that the timeout will not be using the mock clock, and so is only really useful for cleaning up unfinished callbacks rather than being useful to replace the timeout on each callback.

Cancellable version in 1da8eb2

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem might be that the timeout will not be using the mock clock, and so is only really useful for cleaning up unfinished callbacks rather than being useful to replace the timeout on each callback

oo yeah, good point. Change looks good here - thanks.

}(hlr, newCtx)
}

// Wait for the waitgroup and then close the channel to signal completion. This allows
// us to select on the completed channel as well as the timeout
go func() {
defer close(completedChan)
wg.Wait()
}()

select {
case <-completedChan:
return nil
case <-ctx.Done():
return ctx.Err()
case <-e.clock.After(e.emitTimeout):
return fmt.Errorf("timed out waiting for %s event callbacks to complete", event.String())
}
}
101 changes: 101 additions & 0 deletions pkg/node/manager/events_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
//go:build unit || !integration

package manager_test

import (
"context"
"testing"
"time"

"github.com/benbjohnson/clock"
gomock "go.uber.org/mock/gomock"

"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/node/manager"
"github.com/stretchr/testify/suite"
)

type EventEmitterSuite struct {
suite.Suite
ctrl *gomock.Controller
ctx context.Context
clock *clock.Mock
}

func TestEventEmitterSuite(t *testing.T) {
suite.Run(t, new(EventEmitterSuite))
}

func (s *EventEmitterSuite) SetupTest() {
s.ctrl = gomock.NewController(s.T())
s.ctx = context.Background()
s.clock = clock.NewMock()
}

func (s *EventEmitterSuite) TestNewNodeEventEmitter() {
e := manager.NewNodeEventEmitter()
s.NotNil(e)

mockHandler := manager.NewMockNodeEventHandler(s.ctrl)
mockHandler.EXPECT().HandleNodeEvent(gomock.Any(), gomock.Any(), manager.NodeEventApproved)

e.RegisterHandler(mockHandler)

err := e.EmitEvent(s.ctx, models.NodeInfo{}, manager.NodeEventApproved)
s.NoError(err)
}

func (s *EventEmitterSuite) TestRegisterCallback() {
e := manager.NewNodeEventEmitter()
s.NotNil(e)

mockHandler := manager.NewMockNodeEventHandler(s.ctrl)
e.RegisterHandler(mockHandler)
}

func (s *EventEmitterSuite) TestEmitEvent() {
e := manager.NewNodeEventEmitter()
s.NotNil(e)

mockHandler := manager.NewMockNodeEventHandler(s.ctrl)
mockHandler.EXPECT().HandleNodeEvent(gomock.Any(), gomock.Any(), manager.NodeEventApproved)
mockHandler.EXPECT().HandleNodeEvent(gomock.Any(), gomock.Any(), manager.NodeEventRejected)

e.RegisterHandler(mockHandler)

err := e.EmitEvent(s.ctx, models.NodeInfo{}, manager.NodeEventApproved)
s.NoError(err)

err = e.EmitEvent(s.ctx, models.NodeInfo{}, manager.NodeEventRejected)
s.NoError(err)
}

func (s *EventEmitterSuite) TestEmitEventWithNoCallbacks() {
e := manager.NewNodeEventEmitter()
s.NotNil(e)

err := e.EmitEvent(s.ctx, models.NodeInfo{}, manager.NodeEventApproved)
s.NoError(err)
}

func (s *EventEmitterSuite) TestEmitWithSlowCallback() {
e := manager.NewNodeEventEmitter(manager.WithClock(s.clock))
s.NotNil(e)

e.RegisterHandler(testSleepyHandler{s.clock})

go func() {
s.clock.Add(10 * time.Second)
}()
Comment on lines +87 to +89
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this need a go routine? I'd assume adding 10 seconds to the clock is non-blocking?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some weirdness with the mock clock where it won't work when it's inline. I've been trying to work out if I'm just holding it wrong but I suspect it's something to do with the work in EmitEvent happening in a goroutine and requiring a yield (but I'm not 100% sure)


err := e.EmitEvent(s.ctx, models.NodeInfo{}, manager.NodeEventRejected)
s.Error(err)
}

type testSleepyHandler struct {
c *clock.Mock
}

func (t testSleepyHandler) HandleNodeEvent(ctx context.Context, info models.NodeInfo, event manager.NodeEvent) {
t.c.Sleep(2 * time.Second)
}
Loading
Loading