-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fail Stream Replicator on startup if source or target isn't reachable #183
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,7 @@ import ( | |
"os" | ||
"path/filepath" | ||
"reflect" | ||
"strings" | ||
"testing" | ||
"time" | ||
|
||
|
@@ -51,8 +52,7 @@ func TestNewSQSSourceWithInterfaces_Success(t *testing.T) { | |
assert.Nil(err) | ||
} | ||
|
||
// newSQSSourceWithInterfaces should fail if we can't reach SQS, commented out this test until we look into https://github.com/snowplow-devops/stream-replicator/issues/151 | ||
/* | ||
// newSQSSourceWithInterfaces should fail if we can't reach SQS | ||
func TestNewSQSSourceWithInterfaces_Failure(t *testing.T) { | ||
// Unlike the success test, we don't require anything to exist for this one | ||
assert := assert.New(t) | ||
|
@@ -61,10 +61,12 @@ func TestNewSQSSourceWithInterfaces_Failure(t *testing.T) { | |
|
||
source, err := newSQSSourceWithInterfaces(client, "00000000000", 10, testutil.AWSLocalstackRegion, "nonexistent-queue") | ||
|
||
assert.Nil(source) | ||
assert.NotNil(err) | ||
assert.NotNil(source) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the rationale for doing this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right but the issue this PR is for is to establish behaviour where functions like this one fail when we can't connect. The test is meant to illustrate that desired behaviour - so the test should stay as it was and the behaviour of the function should change in this PR |
||
assert.Nil(err) | ||
|
||
err = source.Read(nil) | ||
assert.True(strings.HasPrefix(err.Error(), `Failed to get SQS queue URL`)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If err is nil this will panic - see the rest of the tests in the project on (on master at least) for pattern that avoids this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
||
} | ||
*/ | ||
|
||
// TODO: When we address https://github.com/snowplow-devops/stream-replicator/issues/151, this test will need to change. | ||
func TestSQSSource_ReadFailure(t *testing.T) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -364,32 +364,6 @@ func TestWriteFailure(t *testing.T) { | |
assert.Nil(twres.Invalid) | ||
} | ||
|
||
// TestNewEventHubTarget_KeyValue tests that we can initialise a client with key value credentials. | ||
func TestNewEventHubTarget_KeyValue(t *testing.T) { | ||
assert := assert.New(t) | ||
|
||
// Test that we can initialise a client with Key and Value | ||
t.Setenv("EVENTHUB_KEY_NAME", "fake") | ||
t.Setenv("EVENTHUB_KEY_VALUE", "fake") | ||
|
||
tgt, err := newEventHubTarget(&cfg) | ||
assert.Nil(err) | ||
assert.NotNil(tgt) | ||
} | ||
|
||
// TestNewEventHubTarget_ConnString tests that we can initialise a client with connection string credentials. | ||
func TestNewEventHubTarget_ConnString(t *testing.T) { | ||
assert := assert.New(t) | ||
|
||
// Test that we can initialise a client with Connection String | ||
|
||
t.Setenv("EVENTHUB_CONNECTION_STRING", "Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=fake;SharedAccessKey=fake") | ||
|
||
tgt, err := newEventHubTarget(&cfg) | ||
assert.Nil(err) | ||
assert.NotNil(tgt) | ||
} | ||
|
||
// TestNewEventHubTarget_CredentialsNotFound tests that we fail on startup when we're not provided with appropriate credential values. | ||
func TestNewEventHubTarget_CredentialsNotFound(t *testing.T) { | ||
assert := assert.New(t) | ||
|
@@ -402,18 +376,22 @@ func TestNewEventHubTarget_CredentialsNotFound(t *testing.T) { | |
assert.Nil(tgt) | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test was removed as the same behaviour is tested in the unit test above it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It doesn't look like the same behaviour to me... There are two ways to authenticate against EH - connection string or key value. In the scenario where no valid combination of the required env vars for one of the two methods is present, the client doesn't provide a good debugging experience - so we manually check this in the code. The existing unit tests are for that check. If the correct env vars are provided, however, we can still fail to connect to the client - it might be unavailable, the credentials provided might be incorrect, etc. So another test should be needed to check this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I’m having a hard time understanding the issue here, it seems to me that the test that I deleted was not even correct after all. It sets env vars for I might be missing something here so please if you could provide a bit more context on this, or even book a quick chat about it it would be really helpful. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Taking a second look, you're right, the different possible failures are indeed accounted for. I think I just looked in the wrong place and confused myself - these tests are indeed fine. |
||
// NewEventHubTarget should fail if we can't reach EventHub, commented out this test until we look into https://github.com/snowplow-devops/stream-replicator/issues/151 | ||
// Note that when we do so, the above tests will need to be changed to use some kind of mock | ||
/* | ||
func TestNewEventHubTarget_Failure(t *testing.T) { | ||
func TestNewEventHubTarget_ConnStringErr(t *testing.T) { | ||
assert := assert.New(t) | ||
t.Setenv("EVENTHUB_CONNECTION_STRING", "Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=fake;SharedAccessKey=fake") | ||
|
||
tgt, err := newEventHubTarget(&cfg) | ||
assert.EqualError(err, `Error initialising EventHub client: could not reach Event Hub: dial tcp: lookup test.servicebus.windows.net: no such host`) | ||
assert.Nil(tgt) | ||
} | ||
|
||
func TestNewEventHubTarget_ConnVarsErr(t *testing.T) { | ||
assert := assert.New(t) | ||
|
||
// Test that we can initialise a client with Key and Value | ||
t.Setenv("EVENTHUB_KEY_NAME", "fake") | ||
t.Setenv("EVENTHUB_KEY_VALUE", "fake") | ||
|
||
tgt, err := newEventHubTarget(&cfg) | ||
assert.Equal("Error initialising EventHub client: No valid combination of authentication Env vars found. https://pkg.go.dev/github.com/Azure/azure-event-hubs-go#NewHubWithNamespaceNameAndEnvironment", err.Error()) | ||
assert.EqualError(err, `Error initialising EventHub client: could not reach Event Hub: dial tcp: lookup test.servicebus.windows.net: no such host`) | ||
assert.Nil(tgt) | ||
} | ||
*/ |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -119,10 +119,10 @@ func TestAddHeadersToRequest(t *testing.T) { | |
func TestNewHTTPTarget(t *testing.T) { | ||
assert := assert.New(t) | ||
|
||
httpTarget, err := newHTTPTarget("http://something", 5, 1048576, "application/json", "", "", "", "", "", "", true) | ||
httpTarget, err := newHTTPTarget("http://wrong-url-12345678.com", 5, 1048576, "application/json", "", "", "", "", "", "", true) | ||
|
||
assert.Nil(err) | ||
assert.NotNil(httpTarget) | ||
assert.EqualError(err, `Connection to host error: Head "http://wrong-url-12345678.com": dial tcp: lookup wrong-url-12345678.com: no such host`) | ||
assert.Nil(httpTarget) | ||
|
||
failedHTTPTarget, err1 := newHTTPTarget("something", 5, 1048576, "application/json", "", "", "", "", "", "", true) | ||
|
||
|
@@ -145,6 +145,8 @@ func TestHttpWrite_Simple(t *testing.T) { | |
|
||
var results [][]byte | ||
wg := sync.WaitGroup{} | ||
// add to waitGroup for connection check HEAD request | ||
wg.Add(1) | ||
server := createTestServer(&results, &wg) | ||
defer server.Close() | ||
|
||
|
@@ -166,8 +168,8 @@ func TestHttpWrite_Simple(t *testing.T) { | |
|
||
assert.Nil(err1) | ||
assert.Equal(501, len(writeResult.Sent)) | ||
assert.Equal(501, len(results)) | ||
for _, result := range results { | ||
assert.Equal(502, len(results)) | ||
for _, result := range results[1:] { | ||
assert.Equal("Hello Server!!", string(result)) | ||
} | ||
|
||
|
@@ -179,6 +181,7 @@ func TestHttpWrite_Concurrent(t *testing.T) { | |
|
||
var results [][]byte | ||
wg := sync.WaitGroup{} | ||
wg.Add(1) | ||
server := createTestServer(&results, &wg) | ||
defer server.Close() | ||
|
||
|
@@ -209,51 +212,20 @@ func TestHttpWrite_Concurrent(t *testing.T) { | |
|
||
wg.Wait() | ||
|
||
assert.Equal(10, len(results)) | ||
for _, result := range results { | ||
assert.Equal(11, len(results)) | ||
for _, result := range results[1:] { | ||
assert.Equal("Hello Server!!", string(result)) | ||
} | ||
|
||
assert.Equal(int64(10), ackOps) | ||
} | ||
|
||
func TestHttpWrite_Failure(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test was intended for a separate case to connection error - providing a bad path was just an easy way to achieve it. It's to test the behaviour of stream replicator when requests fail. Rather than removing a test for this case, we should amend the test to fail some other way. For example, a malformed request body would achieve it. Or, we could add a case to the http server to respond to a given path with a success for the HEAD request, but a failure for all other requests. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great idea with the path, I've added an exception in the http client to look for the |
||
assert := assert.New(t) | ||
|
||
var results [][]byte | ||
wg := sync.WaitGroup{} | ||
server := createTestServer(&results, &wg) | ||
defer server.Close() | ||
|
||
target, err := newHTTPTarget("http://NonexistentEndpoint", 5, 1048576, "application/json", "", "", "", "", "", "", true) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
var ackOps int64 | ||
ackFunc := func() { | ||
atomic.AddInt64(&ackOps, 1) | ||
} | ||
|
||
messages := testutil.GetTestMessages(10, "Hello Server!!", ackFunc) | ||
|
||
writeResult, err1 := target.Write(messages) | ||
|
||
assert.NotNil(err1) | ||
if err1 != nil { | ||
assert.Regexp("Error sending http request: 10 errors occurred:.*", err1.Error()) | ||
} | ||
|
||
assert.Equal(10, len(writeResult.Failed)) | ||
assert.Nil(writeResult.Sent) | ||
assert.Nil(writeResult.Oversized) | ||
} | ||
|
||
func TestHttpWrite_Oversized(t *testing.T) { | ||
assert := assert.New(t) | ||
|
||
var results [][]byte | ||
wg := sync.WaitGroup{} | ||
wg.Add(1) | ||
server := createTestServer(&results, &wg) | ||
defer server.Close() | ||
|
||
|
@@ -278,8 +250,8 @@ func TestHttpWrite_Oversized(t *testing.T) { | |
assert.Nil(err1) | ||
assert.Equal(10, len(writeResult.Sent)) | ||
assert.Equal(1, len(writeResult.Oversized)) | ||
assert.Equal(10, len(results)) | ||
for _, result := range results { | ||
assert.Equal(11, len(results)) | ||
for _, result := range results[1:] { | ||
assert.Equal("Hello Server!!", string(result)) | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -150,6 +150,30 @@ func TestPubSubTarget_WriteSuccessWithMocks(t *testing.T) { | |
assert.Equal(int64(10), ackOps) | ||
} | ||
|
||
// TestPubSubTarget_ConnCheckErrWithMocks unit tests the Pubsub target initialization with connection error | ||
func TestPubSubTarget_ConnCheckErrWithMocks(t *testing.T) { | ||
assert := assert.New(t) | ||
srv, conn := testutil.InitMockPubsubServer(8563, nil, t) | ||
srv.Close() | ||
conn.Close() | ||
|
||
pubsubTarget, err := newPubSubTarget(`project-test`, `test-topic-wrong`) | ||
assert.Nil(pubsubTarget) | ||
assert.EqualError(err, `Connection to PubSub failed: context deadline exceeded`) | ||
} | ||
|
||
// TestPubSubTarget_TopicFailWithMocks unit tests the Pubsub target initialization with wrong topic name | ||
func TestPubSubTarget_TopicFailWithMocks(t *testing.T) { | ||
assert := assert.New(t) | ||
srv, conn := testutil.InitMockPubsubServer(8563, nil, t) | ||
defer srv.Close() | ||
defer conn.Close() | ||
|
||
pubsubTarget, err := newPubSubTarget(`project-test`, `test-topic-wrong`) | ||
assert.Nil(pubsubTarget) | ||
assert.EqualError(err, `Connection to PubSub failed, topic does not exist`) | ||
} | ||
|
||
// TestPubSubTarget_WriteFailureWithMocks unit tests the unhappy path for PubSub target | ||
func TestPubSubTarget_WriteFailureWithMocks(t *testing.T) { | ||
assert := assert.New(t) | ||
|
@@ -262,17 +286,13 @@ func TestNewPubSubTarget_Success(t *testing.T) { | |
assert.IsType(PubSubTarget{}, *pubsubTarget) | ||
} | ||
|
||
// TestnewPubSubTarget_Failure tests that we fail early when we cannot reach pubsub | ||
// Commented out as this behaviour is not currently instrumented. | ||
// This test serves to illustrate the desired behaviour for this issue: https://github.com/snowplow-devops/stream-replicator/issues/151 | ||
/* | ||
func TestnewPubSubTarget_Failure(t *testing.T) { | ||
// TestNewPubSubTarget_Failure tests that we fail early when we cannot reach pubsub | ||
func TestNewPubSubTarget_Failure(t *testing.T) { | ||
assert := assert.New(t) | ||
|
||
pubsubTarget, err := newPubSubTarget(`nonexistent-project`, `nonexistent-topic`) | ||
|
||
// TODO: Test for the actual error we expect, when we have instrumented failing fast | ||
assert.NotNil(err) | ||
assert.EqualError(err, `Connection to PubSub failed, topic does not exist`) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Damn, wish I knew about |
||
assert.Nil(pubsubTarget) | ||
} | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit - would be nice to check for the error message as you have done for other sources
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.