Skip to content

Commit

Permalink
fix: don't unsub full subs
Browse files Browse the repository at this point in the history
  • Loading branch information
leg100 committed Apr 27, 2024
1 parent f212080 commit 7215a79
Showing 1 changed file with 13 additions and 19 deletions.
32 changes: 13 additions & 19 deletions internal/pubsub/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,30 +61,24 @@ func (b *Broker[T]) Subscribe(ctx context.Context) <-chan resource.Event[T] {

// Publish an event to subscribers.
//
// TODO: don't forceably unsubscribe full subscribers: subscribers in pug
// typically aren't setup to re-subscribe
// TODO: there is the potential for a subscriber to become full, i.e. its
// buffered channel is full, in which case the broker will block until the
// channel has free capacity again. This should only happen in extremis, e.g. a
// user has a shit-load of modules/workspaces and invokes a massive number of
// parallel tasks, which in turn publishes a shit-load of events. And if it
// happens, it *should* only happen briefly before the subscriber consumes from
// its channel, freeing up capacity. But if the subscriber does not consume
// because it has blocked on something else indefinitely then the broker will
// block indefinitely.
//
// We need need to know when this happens, via some sort of surfacing of
// metrics that does not get blocked itself...
func (b *Broker[T]) Publish(t resource.EventType, payload T) {
var fullSubscribers []chan resource.Event[T]

b.mu.Lock()
for sub := range b.subs {
select {
case sub <- resource.Event[T]{Type: t, Payload: payload}:
continue
default:
// could not publish event to subscriber because their buffer is
// full, so add them to a list for action below
fullSubscribers = append(fullSubscribers, sub)
}
sub <- resource.Event[T]{Type: t, Payload: payload}
}
b.mu.Unlock()

// forceably unsubscribe full subscribers and leave it to them to
// re-subscribe
for _, name := range fullSubscribers {
b.logger.Error("unsubscribing full subscriber", "sub", name, "queue_length", subBufferSize)
b.unsubscribe(name)
}
}

func (b *Broker[T]) unsubscribe(sub chan resource.Event[T]) {
Expand Down

0 comments on commit 7215a79

Please sign in to comment.