Skip to content

Commit

Permalink
feat: keep subscribe api
Browse files Browse the repository at this point in the history
Signed-off-by: gfanton <[email protected]>
  • Loading branch information
gfanton committed Mar 15, 2022
1 parent c761931 commit 3d71dfb
Show file tree
Hide file tree
Showing 9 changed files with 395 additions and 10 deletions.
3 changes: 3 additions & 0 deletions events/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// events defines an event subscriber and dispatcher
// Deprecated: use EventBus instead
package events // import "berty.tech/go-orbit-db/events"
240 changes: 240 additions & 0 deletions events/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
package events

import (
"container/list"
"context"
"fmt"
"sync"

"github.com/libp2p/go-eventbus"
"github.com/libp2p/go-libp2p-core/event"
)

type Event interface{}

// EmitterInterface Root interface for events dispatch
// Deprecated: use event bus directly
type EmitterInterface interface {
// Deprecated: Emit Sends an event to the subscribed listeners
Emit(context.Context, Event)

// Deprecated: GlobalChannel returns a glocal channel that receives emitted events
GlobalChannel(ctx context.Context) <-chan Event

// Deprecated: Subscribe Returns a channel that receives emitted events
Subscribe(ctx context.Context) <-chan Event

// Deprecated: UnsubscribeAll close all listeners channels
UnsubscribeAll()
}

// EventEmitter Registers listeners and dispatches events to them
// Deprecated: use event bus directly
type EventEmitter struct {
bus event.Bus

muEmitters sync.Mutex

cglobal <-chan Event

emitter event.Emitter
cancels []context.CancelFunc
}

func (e *EventEmitter) Emitter(eventType interface{}) (em event.Emitter, err error) {
e.muEmitters.Lock()
bus := e.getBus()
em, err = bus.Emitter(eventType)
e.muEmitters.Unlock()
return
}

type eventBox struct {
evt interface{}
}

// Emit Sends an event to the subscribed listeners
// Deprecated: use event bus directly
func (e *EventEmitter) Emit(ctx context.Context, evt Event) {
e.muEmitters.Lock()

bus := e.getBus()
if e.emitter == nil {
var err error
e.emitter, err = bus.Emitter(new(eventBox))
if err != nil {
serr := fmt.Sprintf("unable to init eventBox emitter: %s", err.Error())
panic(serr)
}
}

e.muEmitters.Unlock()
box := eventBox{evt}
_ = e.emitter.Emit(box)
}

// Subscribe Returns a channel that receives emitted events
// Deprecated: use event bus directly
func (e *EventEmitter) Subscribe(ctx context.Context) <-chan Event {
e.muEmitters.Lock()

bus := e.getBus()

sub, err := bus.Subscribe(event.WildcardSubscription)
if err != nil {
serr := fmt.Sprintf("unable to subscribe: %s", err.Error())
panic(serr)
}

ctx, cancel := context.WithCancel(ctx)
if e.cancels == nil {
e.cancels = []context.CancelFunc{}
}
e.cancels = append(e.cancels, cancel)

e.muEmitters.Unlock()

return e.handleSubscriber(ctx, sub)
}

// UnsubscribeAll close all listeners channels
// Deprecated: use event bus directly
func (e *EventEmitter) UnsubscribeAll() {
e.muEmitters.Lock()
for _, cancel := range e.cancels {
cancel()
}

e.muEmitters.Unlock()
}

func (e *EventEmitter) handleSubscriber(ctx context.Context, sub event.Subscription) <-chan Event {
cevent := make(chan Event, 16)
condProcess := sync.NewCond(&sync.Mutex{})
queue := list.New()
wg := sync.WaitGroup{}

wg.Add(1)
go func() {
defer wg.Done()
defer sub.Close()

for {
var e interface{}
select {
case e = <-sub.Out():
case <-ctx.Done():
condProcess.Signal()
return
}

if box, ok := e.(eventBox); ok {
e = box.evt
}

condProcess.L.Lock()
if queue.Len() == 0 {
// try to push event to the queue
select {
case cevent <- e:
condProcess.L.Unlock()
continue
default:
}
}

// push elem to the queue if the channel is blocking or
// we already have some events to process
queue.PushBack(e)
// signal that we have element to process
condProcess.Signal()
condProcess.L.Unlock()

}
}()

go func() {
condProcess.L.Lock()
for ctx.Err() == nil {
if queue.Len() == 0 {
condProcess.Wait()
continue
}

e := queue.Remove(queue.Front())

// Unlock cond mutex while sending the event
condProcess.L.Unlock()

select {
case <-ctx.Done():
case cevent <- e:
}

condProcess.L.Lock()
}
condProcess.L.Unlock()

wg.Wait()
close(cevent)
}()

return cevent
}

// GlobalChannel returns a glocal channel that receives emitted events
// Deprecated: use event bus directly
func (e *EventEmitter) GlobalChannel(ctx context.Context) (cc <-chan Event) {
e.muEmitters.Lock()
if e.cglobal == nil {
bus := e.getBus()

sub, err := bus.Subscribe(event.WildcardSubscription)
if err != nil {
serr := fmt.Sprintf("unable to subscribe: %s", err.Error())
panic(serr)
}

ctx, cancel := context.WithCancel(ctx)
if e.cancels == nil {
e.cancels = []context.CancelFunc{}
}
e.cancels = append(e.cancels, cancel)

e.cglobal = e.handleSubscriber(ctx, sub)
}

cc = e.cglobal
e.muEmitters.Unlock()

return
}

func (e *EventEmitter) GetBus() (bus event.Bus) {
e.muEmitters.Lock()
bus = e.getBus()
e.muEmitters.Unlock()
return
}

// will panic if a bus is already set
func (e *EventEmitter) SetBus(bus event.Bus) (err error) {
e.muEmitters.Lock()

if bus == nil {
e.bus = bus
} else {
err = fmt.Errorf("bus is already init")
}

e.muEmitters.Unlock()
return
}

func (e *EventEmitter) getBus() (bus event.Bus) {
if e.bus == nil {
e.bus = eventbus.NewBus()
}

return e.bus
}
15 changes: 15 additions & 0 deletions events/events_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package events

type eventsQueue []Event

func (e eventsQueue) Len() int { return len(e) }

func (e *eventsQueue) Push(x Event) {
*e = append(*e, x)
}

func (e *eventsQueue) Pop() (item Event) {
item, (*e)[0] = (*e)[0], nil
*e = (*e)[1:]
return item
}
118 changes: 118 additions & 0 deletions events/events_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package events

import (
"context"
"fmt"
"testing"
"time"
)

// Sequential write
func TestSequentialWrite(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

e := EventEmitter{}
expectedClients := 10
expectedEvents := 100

chs := make([]<-chan Event, expectedClients)

for i := 0; i < expectedClients; i++ {
chs[i] = e.Subscribe(ctx)
}

go func() {
for i := 0; i < expectedEvents; i++ {
e.Emit(ctx, fmt.Sprintf("%d", i))
}
}()

for i := 0; i < expectedClients; i++ {
for j := 0; j < expectedEvents; j++ {
var item interface{}
select {
case item = <-chs[i]:
case <-time.After(time.Second * 2):
t.Fatalf("timeout while waiting for event: %d %d", i, j)
}
s := item.(string)
if s != fmt.Sprintf("%d", j) {
t.Fatalf("%s should be equal to %d", s, j)
}
}
}

cancel()

<-time.After(time.Millisecond * 100)

for i := 0; i < expectedClients; i++ {
for range chs[i] {
t.Fatal("should not occur")
}
}
}

func TestMissingListeners(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

e := EventEmitter{}
expectedEvents := 10

go func() {
for i := 0; i < expectedEvents; i++ {
e.Emit(ctx, fmt.Sprintf("%d", i))
}

<-time.After(10 * time.Millisecond)
}()

<-time.After(100 * time.Millisecond)
}

func TestPartialListeners(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

e := EventEmitter{}

go func() {
for i := 0; i < 5; i++ {
e.Emit(ctx, fmt.Sprintf("%d", i))
}
}()

<-time.After(time.Millisecond * 100)

subCtx, subCancel := context.WithCancel(context.Background())
sub := e.Subscribe(subCtx)

<-time.After(time.Millisecond * 100)

for i := 5; i < 10; i++ {
e.Emit(ctx, fmt.Sprintf("%d", i))
}

<-time.After(time.Millisecond * 100)

for i := 5; i < 10; i++ {
item := <-sub
itemStr, ok := item.(string)
if !ok {
t.Fatalf("unable to cast")
}

if itemStr != fmt.Sprintf("%d", i) {
t.Errorf("(%s) should be equal to (%d)", itemStr, i)
}
}

<-time.After(time.Second)

subCancel()

for range sub {
t.Fatalf("this should not happen")
}
}
Loading

0 comments on commit 3d71dfb

Please sign in to comment.