From 7da8f940a1155398341056e276cfad9678018634 Mon Sep 17 00:00:00 2001 From: Marc Abramowitz Date: Wed, 30 May 2018 11:51:09 -0700 Subject: [PATCH] Test for goroutine leaks Add goroutine leak testing to tests to probe for goroutine leakage, as mentioned in https://github.com/gambol99/go-marathon/issues/208 --- subscription_test.go | 69 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/subscription_test.go b/subscription_test.go index abc4c08..bb97aca 100644 --- a/subscription_test.go +++ b/subscription_test.go @@ -17,11 +17,14 @@ limitations under the License. package marathon import ( + "fmt" "net" "net/http" + "runtime" "testing" "time" + "github.com/fortytw2/leaktest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -272,6 +275,8 @@ var testCases = testCaseList{ } func TestSubscriptions(t *testing.T) { + defer leaktest.Check(t)() + endpoint := newFakeMarathonEndpoint(t, nil) defer endpoint.Close() @@ -283,6 +288,8 @@ func TestSubscriptions(t *testing.T) { } func TestSubscribe(t *testing.T) { + defer leaktest.Check(t)() + endpoint := newFakeMarathonEndpoint(t, nil) defer endpoint.Close() @@ -291,6 +298,8 @@ func TestSubscribe(t *testing.T) { } func TestUnsubscribe(t *testing.T) { + defer leaktest.Check(t)() + endpoint := newFakeMarathonEndpoint(t, nil) defer endpoint.Close() @@ -299,6 +308,8 @@ func TestUnsubscribe(t *testing.T) { } func TestSSEWithGlobalTimeout(t *testing.T) { + defer leaktest.Check(t)() + clientCfg := NewDefaultConfig() clientCfg.HTTPSSEClient = &http.Client{ Timeout: 1 * time.Second, @@ -314,7 +325,59 @@ func TestSSEWithGlobalTimeout(t *testing.T) { assert.Error(t, err) } +func TestSSEWithManyListeners(t *testing.T) { + defer leaktest.Check(t)() + + clientCfg := NewDefaultConfig() + clientCfg.HTTPSSEClient = &http.Client{} + config := configContainer{ + client: &clientCfg, + } + config.client.EventsTransport = EventsTransportSSE + endpoint := newFakeMarathonEndpoint(t, &config) + defer endpoint.Close() + + for i := 0; i < 1000; i++ { + _, err := endpoint.Client.AddEventsListener(EventIDApplications) + require.NoError(t, err) + } +} + +func TestWhenEventsAreNotConsumed(t *testing.T) { + var startNumGoroutines, endNumGoroutines int + + startNumGoroutines = runtime.NumGoroutine() + + defer leaktest.Check(t)() + + clientCfg := NewDefaultConfig() + config := configContainer{ + client: &clientCfg, + } + config.client.EventsTransport = EventsTransportSSE + endpoint := newFakeMarathonEndpoint(t, &config) + defer endpoint.Close() + + _, err := endpoint.Client.AddEventsListener(EventIDApplications | EventIDDeploymentInfo | EventIDDeploymentStepSuccess) + assert.NoError(t, err) + + // Give it a bit of time so that the subscription can be set up + time.Sleep(SSEConnectWaitTime) + + // Publish a whole bunch of events + for i := 0; i < 1000; i++ { + for _, testCase := range testCases { + endpoint.Server.PublishEvent(testCase.source) + } + } + + endNumGoroutines = runtime.NumGoroutine() + fmt.Printf("startNumGoroutines = %d; endNumGoroutines = %d\n", startNumGoroutines, endNumGoroutines) +} + func TestEventStreamEventsReceived(t *testing.T) { + defer leaktest.Check(t)() + require.True(t, len(testCases) > 1, "must have at least 2 test cases to end prematurely") clientCfg := NewDefaultConfig() @@ -375,6 +438,8 @@ func TestEventStreamEventsReceived(t *testing.T) { } func TestConnectToSSESuccess(t *testing.T) { + defer leaktest.Check(t)() + clientCfg := NewDefaultConfig() // Use non-existent address as first cluster member clientCfg.URL = "http://127.0.0.1:11111" @@ -396,6 +461,8 @@ func TestConnectToSSESuccess(t *testing.T) { } func TestConnectToSSEFailure(t *testing.T) { + defer leaktest.Check(t)() + clientCfg := NewDefaultConfig() clientCfg.EventsTransport = EventsTransportSSE config := configContainer{client: &clientCfg} @@ -413,6 +480,8 @@ func TestConnectToSSEFailure(t *testing.T) { } func TestRegisterSEESubscriptionReconnectsStreamOnError(t *testing.T) { + defer leaktest.Check(t)() + clientCfg := NewDefaultConfig() clientCfg.HTTPSSEClient = &http.Client{ Transport: &http.Transport{