Skip to content

Commit

Permalink
fix: multicast channel block
Browse files Browse the repository at this point in the history
  • Loading branch information
neurosnap committed Sep 5, 2024
1 parent 81b6f9d commit 96ac147
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 5 deletions.
16 changes: 11 additions & 5 deletions multicast.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pubsub

import (
"fmt"
"io"
"log/slog"

Expand All @@ -23,7 +24,13 @@ func (b *PubSubMulticast) Sub(sub *Subscriber) error {
sub.ID = id.String()
b.Logger.Info("sub", "channel", sub.Name, "id", id)
b.subs = append(b.subs, sub)
b.Chan <- sub
select {
case b.Chan <- sub:
// message sent
default:
// message dropped
}

return sub.Wait()
}

Expand Down Expand Up @@ -62,10 +69,7 @@ func (b *PubSubMulticast) Pub(msg *Msg) error {
log.Info("no subs found, waiting for sub")
sub = <-b.Chan
if b.PubMatcher(msg, sub) {
log.Info("sub found")
matches = append(matches, sub)
writers = append(writers, sub.Writer)
break
return b.Pub(msg)
}
}
}
Expand All @@ -76,8 +80,10 @@ func (b *PubSubMulticast) Pub(msg *Msg) error {
if err != nil {
log.Error("pub", "err", err)
}
fmt.Println(len(matches))
for _, sub := range matches {
sub.Chan <- err
log.Info("sub unsub")
err = b.UnSub(sub)
if err != nil {
log.Error("unsub err", "err", err)
Expand Down
2 changes: 2 additions & 0 deletions pubsub.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pubsub

import (
"fmt"
"io"
"log/slog"
)
Expand All @@ -13,6 +14,7 @@ type Subscriber struct {
}

func (s *Subscriber) Wait() error {
fmt.Println("wwww")
err := <-s.Chan
return err
}
Expand Down

0 comments on commit 96ac147

Please sign in to comment.