From aabf41661b1008b481a8dfa7c1aba46ad195ee65 Mon Sep 17 00:00:00 2001 From: Timo Reimann Date: Wed, 28 Sep 2016 23:16:50 +0200 Subject: [PATCH] Close event channel on event listener removal. On invocation of RemoveEventsListener, wait for all pending goroutines to complete and close the events channel afterwards. This will enable clients for-looping through the event channel to terminate naturally. Also merge two tests into one covering all relevant cases. --- CHANGELOG.md | 3 + client.go | 5 +- subscription.go | 20 +++-- subscription_test.go | 200 +++++++++++++++++++++---------------------- 4 files changed, 120 insertions(+), 108 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6c1b427..2e8687e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] +### Added +- [#211][PR211] Close event channel on event listener removal. ## [0.3.0] - 2016-09-28 - [#201][PR201]: Subscribe method is now exposed on the client to allow subscription of callback URL's @@ -74,6 +76,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). [0.1.1]: https://github.com/gambol99/go-marathon/compare/v0.1.0...v0.1.1 [0.1.0]: https://github.com/gambol99/go-marathon/compare/v0.0.1...v0.1.0 +[PR211]: https://github.com/gambol99/go-marathon/pull/211 [PR205]: https://github.com/gambol99/go-marathon/pull/205 [PR202]: https://github.com/gambol99/go-marathon/pull/202 [PR201]: https://github.com/gambol99/go-marathon/pull/201 diff --git a/client.go b/client.go index 8f19406..201e2c9 100644 --- a/client.go +++ b/client.go @@ -161,8 +161,9 @@ var ( // EventsChannelContext holds contextual data for an EventsChannel. type EventsChannelContext struct { - filter int - done chan struct{} + filter int + done chan struct{} + completion *sync.WaitGroup } type marathonClient struct { diff --git a/subscription.go b/subscription.go index cc11799..a4c834a 100644 --- a/subscription.go +++ b/subscription.go @@ -23,6 +23,7 @@ import ( "net" "net/http" "strings" + "sync" "time" "github.com/donovanhide/eventsource" @@ -57,8 +58,9 @@ func (r *marathonClient) AddEventsListener(filter int) (EventsChannel, error) { channel := make(EventsChannel) r.listeners[channel] = EventsChannelContext{ - filter: filter, - done: make(chan struct{}, 1), + filter: filter, + done: make(chan struct{}, 1), + completion: &sync.WaitGroup{}, } return channel, nil } @@ -77,6 +79,12 @@ func (r *marathonClient) RemoveEventsListener(channel EventsChannel) { if r.config.EventsTransport == EventsTransportCallback && len(r.listeners) == 0 { r.Unsubscribe(r.SubscriptionURL()) } + + // step: wait for pending goroutines to finish and close channel + go func(completion *sync.WaitGroup) { + completion.Wait() + close(channel) + }(context.completion) } } @@ -255,13 +263,15 @@ func (r *marathonClient) handleEvent(content string) error { for channel, context := range r.listeners { // step: check if this listener wants this event type if event.ID&context.filter != 0 { - go func(ch EventsChannel, done <-chan struct{}, e *Event) { + context.completion.Add(1) + go func(ch EventsChannel, context EventsChannelContext, e *Event) { + defer context.completion.Done() select { case ch <- e: - case <-done: + case <-context.done: // Terminates goroutine. } - }(channel, context.done, event) + }(channel, context, event) } } diff --git a/subscription_test.go b/subscription_test.go index eed029f..d025ade 100644 --- a/subscription_test.go +++ b/subscription_test.go @@ -26,25 +26,38 @@ import ( const eventPublishTimeout time.Duration = 250 * time.Millisecond +type testCaseList []testCase + +func (l testCaseList) find(name string) *testCase { + for _, testCase := range l { + if testCase.name == name { + return &testCase + } + } + return nil +} + type testCase struct { + name string source string expectation interface{} } -var testCases = map[string]*testCase{ - "status_update_event": &testCase{ - `{ - "eventType": "status_update_event", - "timestamp": "2014-03-01T23:29:30.158Z", - "slaveId": "20140909-054127-177048842-5050-1494-0", - "taskId": "my-app_0-1396592784349", - "taskStatus": "TASK_RUNNING", - "appId": "/my-app", - "host": "slave-1234.acme.org", - "ports": [31372], - "version": "2014-04-04T06:26:23.051Z" - }`, - &EventStatusUpdate{ +var testCases = testCaseList{ + testCase{ + name: "status_update_event", + source: `{ + "eventType": "status_update_event", + "timestamp": "2014-03-01T23:29:30.158Z", + "slaveId": "20140909-054127-177048842-5050-1494-0", + "taskId": "my-app_0-1396592784349", + "taskStatus": "TASK_RUNNING", + "appId": "/my-app", + "host": "slave-1234.acme.org", + "ports": [31372], + "version": "2014-04-04T06:26:23.051Z" +}`, + expectation: &EventStatusUpdate{ EventType: "status_update_event", Timestamp: "2014-03-01T23:29:30.158Z", SlaveID: "20140909-054127-177048842-5050-1494-0", @@ -56,16 +69,17 @@ var testCases = map[string]*testCase{ Version: "2014-04-04T06:26:23.051Z", }, }, - "health_status_changed_event": &testCase{ - `{ - "eventType": "health_status_changed_event", - "timestamp": "2014-03-01T23:29:30.158Z", - "appId": "/my-app", - "taskId": "my-app_0-1396592784349", - "version": "2014-04-04T06:26:23.051Z", - "alive": true - }`, - &EventHealthCheckChanged{ + testCase{ + name: "health_status_changed_event", + source: `{ + "eventType": "health_status_changed_event", + "timestamp": "2014-03-01T23:29:30.158Z", + "appId": "/my-app", + "taskId": "my-app_0-1396592784349", + "version": "2014-04-04T06:26:23.051Z", + "alive": true +}`, + expectation: &EventHealthCheckChanged{ EventType: "health_status_changed_event", Timestamp: "2014-03-01T23:29:30.158Z", AppID: "/my-app", @@ -74,23 +88,24 @@ var testCases = map[string]*testCase{ Alive: true, }, }, - "failed_health_check_event": &testCase{ - `{ - "eventType": "failed_health_check_event", - "timestamp": "2014-03-01T23:29:30.158Z", - "appId": "/my-app", - "taskId": "my-app_0-1396592784349", - "healthCheck": { - "protocol": "HTTP", - "path": "/health", - "portIndex": 0, - "gracePeriodSeconds": 5, - "intervalSeconds": 10, - "timeoutSeconds": 10, - "maxConsecutiveFailures": 3 - } - }`, - &EventFailedHealthCheck{ + testCase{ + name: "failed_health_check_event", + source: `{ + "eventType": "failed_health_check_event", + "timestamp": "2014-03-01T23:29:30.158Z", + "appId": "/my-app", + "taskId": "my-app_0-1396592784349", + "healthCheck": { + "protocol": "HTTP", + "path": "/health", + "portIndex": 0, + "gracePeriodSeconds": 5, + "intervalSeconds": 10, + "timeoutSeconds": 10, + "maxConsecutiveFailures": 3 + } +}`, + expectation: &EventFailedHealthCheck{ EventType: "failed_health_check_event", Timestamp: "2014-03-01T23:29:30.158Z", AppID: "/my-app", @@ -113,21 +128,22 @@ var testCases = map[string]*testCase{ }, }, }, - "deployment_info": &testCase{ - `{ - "eventType": "deployment_info", - "timestamp": "2016-07-29T08:03:52.542Z", - "plan": {}, - "currentStep": { - "actions": [ - { - "type": "ScaleApplication", - "app": "/my-app" - } - ] + testCase{ + name: "deployment_info", + source: `{ + "eventType": "deployment_info", + "timestamp": "2016-07-29T08:03:52.542Z", + "plan": {}, + "currentStep": { + "actions": [ + { + "type": "ScaleApplication", + "app": "/my-app" } - }`, - &EventDeploymentInfo{ + ] + } +}`, + expectation: &EventDeploymentInfo{ EventType: "deployment_info", Timestamp: "2016-07-29T08:03:52.542Z", Plan: &DeploymentPlan{}, @@ -192,6 +208,10 @@ func TestEventStreamConnectionErrorsForwarded(t *testing.T) { } func TestEventStreamEventsReceived(t *testing.T) { + if !assert.True(t, len(testCases) > 1, "must have at least 2 test cases to end prematurely") { + return + } + clientCfg := NewDefaultConfig() config := configContainer{ client: &clientCfg, @@ -203,67 +223,45 @@ func TestEventStreamEventsReceived(t *testing.T) { events, err := endpoint.Client.AddEventsListener(EventIDApplications | EventIDDeploymentInfo) assert.NoError(t, err) - // Publish test events - go func() { - for _, testCase := range testCases { - endpoint.Server.PublishEvent(testCase.source) - } - }() + almostAllTestCases := testCases[:len(testCases)-1] + finalTestCase := testCases[len(testCases)-1] + + // Publish all but one test event. + for _, testCase := range almostAllTestCases { + endpoint.Server.PublishEvent(testCase.source) + } - // Receive test events - for i := 0; i < len(testCases); i++ { + // Receive test events. + for i := 0; i < len(almostAllTestCases); i++ { select { case event := <-events: - assert.Equal(t, testCases[event.Name].expectation, event.Event) + tc := testCases.find(event.Name) + if !assert.NotNil(t, tc, "received unknown event: %s", event.Name) { + continue + } + assert.Equal(t, tc.expectation, event.Event) case <-time.After(eventPublishTimeout): assert.Fail(t, "did not receive event in time") - return } } -} - -func TestEventStreamPrematureTermination(t *testing.T) { - if !assert.True(t, len(testCases) > 1, "must have at least 2 test cases to end prematurely") { - return - } - clientCfg := NewDefaultConfig() - config := configContainer{ - client: &clientCfg, - } - config.client.EventsTransport = EventsTransportSSE - endpoint := newFakeMarathonEndpoint(t, &config) - defer endpoint.Close() - - events, err := endpoint.Client.AddEventsListener(EventIDApplications | EventIDDeploymentInfo) - if !assert.NoError(t, err) { - return - } - - // Publish test events - go func() { - for _, testCase := range testCases { - endpoint.Server.PublishEvent(testCase.source) - } - }() - - // Receive single event - select { - case <-events: - case <-time.After(eventPublishTimeout): - assert.Fail(t, "did not receive event in time") - return - } + // Publish last test event that we do not intend to consume anymore. + endpoint.Server.PublishEvent(finalTestCase.source) // Give event stream some time to buffer another event. time.Sleep(eventPublishTimeout) // Trigger done channel closure. endpoint.Client.RemoveEventsListener(events) + + // Give pending goroutine time to consume done signal. + time.Sleep(eventPublishTimeout) + + // Validate that channel is closed. select { - case <-events: - assert.Fail(t, "should not have received additional events") + case _, more := <-events: + assert.False(t, more, "should not have received another event") default: - // All good. + assert.Fail(t, "channel was not closed") } }