Skip to content

Commit

Permalink
Close event channel on event listener removal.
Browse files Browse the repository at this point in the history
On invocation of RemoveEventsListener, wait for all pending goroutines
to complete and close the events channel afterwards. This will enable
clients for-looping through the event channel to terminate naturally.

Also merge two tests into one covering all relevant cases.
  • Loading branch information
timoreimann committed Sep 28, 2016
1 parent effb742 commit 154fa94
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 108 deletions.
5 changes: 3 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,9 @@ var (

// EventsChannelContext holds contextual data for an EventsChannel.
type EventsChannelContext struct {
filter int
done chan struct{}
filter int
done chan struct{}
completion *sync.WaitGroup
}

type marathonClient struct {
Expand Down
20 changes: 15 additions & 5 deletions subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net"
"net/http"
"strings"
"sync"
"time"

"github.com/donovanhide/eventsource"
Expand Down Expand Up @@ -57,8 +58,9 @@ func (r *marathonClient) AddEventsListener(filter int) (EventsChannel, error) {

channel := make(EventsChannel)
r.listeners[channel] = EventsChannelContext{
filter: filter,
done: make(chan struct{}, 1),
filter: filter,
done: make(chan struct{}, 1),
completion: &sync.WaitGroup{},
}
return channel, nil
}
Expand All @@ -77,6 +79,12 @@ func (r *marathonClient) RemoveEventsListener(channel EventsChannel) {
if r.config.EventsTransport == EventsTransportCallback && len(r.listeners) == 0 {
r.Unsubscribe(r.SubscriptionURL())
}

// step: wait for pending goroutines to finish and close channel
go func(completion *sync.WaitGroup) {
completion.Wait()
close(channel)
}(context.completion)
}
}

Expand Down Expand Up @@ -255,13 +263,15 @@ func (r *marathonClient) handleEvent(content string) error {
for channel, context := range r.listeners {
// step: check if this listener wants this event type
if event.ID&context.filter != 0 {
go func(ch EventsChannel, done <-chan struct{}, e *Event) {
context.completion.Add(1)
go func(ch EventsChannel, context EventsChannelContext, e *Event) {
defer context.completion.Done()
select {
case ch <- e:
case <-done:
case <-context.done:
// Terminates goroutine.
}
}(channel, context.done, event)
}(channel, context, event)
}
}

Expand Down
197 changes: 96 additions & 101 deletions subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,38 @@ import (

const eventPublishTimeout time.Duration = 250 * time.Millisecond

type testCaseList []testCase

func (l testCaseList) find(name string) *testCase {
for _, testCase := range l {
if testCase.name == name {
return &testCase
}
}
return nil
}

type testCase struct {
name string
source string
expectation interface{}
}

var testCases = map[string]*testCase{
"status_update_event": &testCase{
`{
"eventType": "status_update_event",
"timestamp": "2014-03-01T23:29:30.158Z",
"slaveId": "20140909-054127-177048842-5050-1494-0",
"taskId": "my-app_0-1396592784349",
"taskStatus": "TASK_RUNNING",
"appId": "/my-app",
"host": "slave-1234.acme.org",
"ports": [31372],
"version": "2014-04-04T06:26:23.051Z"
}`,
&EventStatusUpdate{
var testCases = testCaseList{
testCase{
name: "status_update_event",
source: `{
"eventType": "status_update_event",
"timestamp": "2014-03-01T23:29:30.158Z",
"slaveId": "20140909-054127-177048842-5050-1494-0",
"taskId": "my-app_0-1396592784349",
"taskStatus": "TASK_RUNNING",
"appId": "/my-app",
"host": "slave-1234.acme.org",
"ports": [31372],
"version": "2014-04-04T06:26:23.051Z"
}`,
expectation: &EventStatusUpdate{
EventType: "status_update_event",
Timestamp: "2014-03-01T23:29:30.158Z",
SlaveID: "20140909-054127-177048842-5050-1494-0",
Expand All @@ -56,16 +69,17 @@ var testCases = map[string]*testCase{
Version: "2014-04-04T06:26:23.051Z",
},
},
"health_status_changed_event": &testCase{
`{
"eventType": "health_status_changed_event",
"timestamp": "2014-03-01T23:29:30.158Z",
"appId": "/my-app",
"taskId": "my-app_0-1396592784349",
"version": "2014-04-04T06:26:23.051Z",
"alive": true
}`,
&EventHealthCheckChanged{
testCase{
name: "health_status_changed_event",
source: `{
"eventType": "health_status_changed_event",
"timestamp": "2014-03-01T23:29:30.158Z",
"appId": "/my-app",
"taskId": "my-app_0-1396592784349",
"version": "2014-04-04T06:26:23.051Z",
"alive": true
}`,
expectation: &EventHealthCheckChanged{
EventType: "health_status_changed_event",
Timestamp: "2014-03-01T23:29:30.158Z",
AppID: "/my-app",
Expand All @@ -74,23 +88,24 @@ var testCases = map[string]*testCase{
Alive: true,
},
},
"failed_health_check_event": &testCase{
`{
"eventType": "failed_health_check_event",
"timestamp": "2014-03-01T23:29:30.158Z",
"appId": "/my-app",
"taskId": "my-app_0-1396592784349",
"healthCheck": {
"protocol": "HTTP",
"path": "/health",
"portIndex": 0,
"gracePeriodSeconds": 5,
"intervalSeconds": 10,
"timeoutSeconds": 10,
"maxConsecutiveFailures": 3
}
}`,
&EventFailedHealthCheck{
testCase{
name: "failed_health_check_event",
source: `{
"eventType": "failed_health_check_event",
"timestamp": "2014-03-01T23:29:30.158Z",
"appId": "/my-app",
"taskId": "my-app_0-1396592784349",
"healthCheck": {
"protocol": "HTTP",
"path": "/health",
"portIndex": 0,
"gracePeriodSeconds": 5,
"intervalSeconds": 10,
"timeoutSeconds": 10,
"maxConsecutiveFailures": 3
}
}`,
expectation: &EventFailedHealthCheck{
EventType: "failed_health_check_event",
Timestamp: "2014-03-01T23:29:30.158Z",
AppID: "/my-app",
Expand All @@ -113,21 +128,22 @@ var testCases = map[string]*testCase{
},
},
},
"deployment_info": &testCase{
`{
"eventType": "deployment_info",
"timestamp": "2016-07-29T08:03:52.542Z",
"plan": {},
"currentStep": {
"actions": [
{
"type": "ScaleApplication",
"app": "/my-app"
}
]
testCase{
name: "deployment_info",
source: `{
"eventType": "deployment_info",
"timestamp": "2016-07-29T08:03:52.542Z",
"plan": {},
"currentStep": {
"actions": [
{
"type": "ScaleApplication",
"app": "/my-app"
}
}`,
&EventDeploymentInfo{
]
}
}`,
expectation: &EventDeploymentInfo{
EventType: "deployment_info",
Timestamp: "2016-07-29T08:03:52.542Z",
Plan: &DeploymentPlan{},
Expand Down Expand Up @@ -192,6 +208,10 @@ func TestEventStreamConnectionErrorsForwarded(t *testing.T) {
}

func TestEventStreamEventsReceived(t *testing.T) {
if !assert.True(t, len(testCases) > 1, "must have at least 2 test cases to end prematurely") {
return
}

clientCfg := NewDefaultConfig()
config := configContainer{
client: &clientCfg,
Expand All @@ -203,67 +223,42 @@ func TestEventStreamEventsReceived(t *testing.T) {
events, err := endpoint.Client.AddEventsListener(EventIDApplications | EventIDDeploymentInfo)
assert.NoError(t, err)

// Publish test events
go func() {
for _, testCase := range testCases {
endpoint.Server.PublishEvent(testCase.source)
}
}()
// Publish all but one test event.
for i := 0; i < len(testCases)-1; i++ {
endpoint.Server.PublishEvent(testCases[i].source)
}

// Receive test events
for i := 0; i < len(testCases); i++ {
// Receive test events.
for i := 0; i < len(testCases)-1; i++ {
select {
case event := <-events:
assert.Equal(t, testCases[event.Name].expectation, event.Event)
tc := testCases.find(event.Name)
if !assert.NotNil(t, tc, "received unknown event: %s", event.Name) {
continue
}
assert.Equal(t, tc.expectation, event.Event)
case <-time.After(eventPublishTimeout):
assert.Fail(t, "did not receive event in time")
return
}
}
}

func TestEventStreamPrematureTermination(t *testing.T) {
if !assert.True(t, len(testCases) > 1, "must have at least 2 test cases to end prematurely") {
return
}

clientCfg := NewDefaultConfig()
config := configContainer{
client: &clientCfg,
}
config.client.EventsTransport = EventsTransportSSE
endpoint := newFakeMarathonEndpoint(t, &config)
defer endpoint.Close()

events, err := endpoint.Client.AddEventsListener(EventIDApplications | EventIDDeploymentInfo)
if !assert.NoError(t, err) {
return
}

// Publish test events
go func() {
for _, testCase := range testCases {
endpoint.Server.PublishEvent(testCase.source)
}
}()

// Receive single event
select {
case <-events:
case <-time.After(eventPublishTimeout):
assert.Fail(t, "did not receive event in time")
return
}
// Publish last test event that we do not intend to consume anymore.
endpoint.Server.PublishEvent(testCases[len(testCases)-1].source)

// Give event stream some time to buffer another event.
time.Sleep(eventPublishTimeout)

// Trigger done channel closure.
endpoint.Client.RemoveEventsListener(events)

// Give pending goroutine time to consume done signal.
time.Sleep(eventPublishTimeout)

// Validate that channel is closed.
select {
case <-events:
assert.Fail(t, "should not have received additional events")
case _, more := <-events:
assert.False(t, more, "should not have received another event")
default:
// All good.
assert.Fail(t, "channel was not closed")
}
}

0 comments on commit 154fa94

Please sign in to comment.