Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail Stream Replicator on startup if source or target isn't reachable #183

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pkg/source/pubsub/pubsub_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ func newPubSubSource(concurrentWrites int, projectID string, subscriptionID stri
return nil, errors.Wrap(err, "Failed to create PubSub client")
}

sub := client.SubscriptionInProject(subscriptionID, projectID)
exists, err := sub.Exists(ctx)
if err != nil {
return nil, errors.Wrap(err, "Connection to PubSub failed")
}
if !exists {
return nil, errors.New("Connection to PubSub failed, subscription does not exist")
}

return &pubSubSource{
projectID: projectID,
client: client,
Expand Down
28 changes: 25 additions & 3 deletions pkg/source/pubsub/pubsub_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ func TestPubSubSource_ReadAndReturnSuccessIntegration(t *testing.T) {
}
}

// newPubSubSource_Failure should fail if we can't reach PubSub, commented out this test until we look into https://github.com/snowplow-devops/stream-replicator/issues/151
/*
// newPubSubSource_Failure should fail if we can't reach PubSub
func TestNewPubSubSource_Failure(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
Expand All @@ -77,7 +76,6 @@ func TestNewPubSubSource_Failure(t *testing.T) {
assert.Nil(pubsubSource)
// This should return an error when we can't connect, rather than proceeding to the Write() function before we hit a problem.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - would be nice to check for the error message as you have done for other sources

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

}
*/

// TestNewPubSubSource_Success tests the typical case of creating a new pubsub source.
func TestNewPubSubSource_Success(t *testing.T) {
Expand Down Expand Up @@ -132,6 +130,30 @@ func TestPubSubSource_ReadAndReturnSuccessWithMock(t *testing.T) {
assert.Equal(expected, msgDatas)
}

// TestPubSubSource_ConnCheckErrWithMocks unit tests PubSub source with connection error
func TestPubSubSource_ConnCheckErrWithMocks(t *testing.T) {
assert := assert.New(t)
srv, conn := testutil.InitMockPubsubServer(8563, nil, t)
srv.Close()
conn.Close()

pubsubSource, err := newPubSubSource(10, "project-test", "test-sub-wrong")
assert.Nil(pubsubSource)
assert.EqualError(err, `Connection to PubSub failed: context deadline exceeded`)
}

// TestPubSubSource_SubFailWithMocks unit tests PubSub source initiation with wrong sub name
func TestPubSubSource_SubFailWithMocks(t *testing.T) {
assert := assert.New(t)
srv, conn := testutil.InitMockPubsubServer(8563, nil, t)
defer srv.Close()
defer conn.Close()

pubsubSource, err := newPubSubSource(10, "project-test", "test-sub-wrong")
assert.Nil(pubsubSource)
assert.EqualError(err, `Connection to PubSub failed, subscription does not exist`)
}

// TestPubSubSource_ReadAndReturnSuccessWithMock_DelayedAcks tests the behaviour of pubsub source when some messages take longer to ack than others
func TestPubSubSource_ReadAndReturnSuccessWithMock_DelayedAcks(t *testing.T) {
assert := assert.New(t)
Expand Down
12 changes: 7 additions & 5 deletions pkg/source/sqs/sqs_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os"
"path/filepath"
"reflect"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -51,8 +52,7 @@ func TestNewSQSSourceWithInterfaces_Success(t *testing.T) {
assert.Nil(err)
}

// newSQSSourceWithInterfaces should fail if we can't reach SQS, commented out this test until we look into https://github.com/snowplow-devops/stream-replicator/issues/151
/*
// newSQSSourceWithInterfaces should fail if we can't reach SQS
func TestNewSQSSourceWithInterfaces_Failure(t *testing.T) {
// Unlike the success test, we don't require anything to exist for this one
assert := assert.New(t)
Expand All @@ -61,10 +61,12 @@ func TestNewSQSSourceWithInterfaces_Failure(t *testing.T) {

source, err := newSQSSourceWithInterfaces(client, "00000000000", 10, testutil.AWSLocalstackRegion, "nonexistent-queue")

assert.Nil(source)
assert.NotNil(err)
assert.NotNil(source)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the rationale for doing this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As newSqsSourceWithInterfaces just returns a new sqsSource regardless of the parameters it gets and never errors, the assertion we had there was wrong.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right but the issue this PR is for is to establish behaviour where functions like this one fail when we can't connect. The test is meant to illustrate that desired behaviour - so the test should stay as it was and the behaviour of the function should change in this PR

assert.Nil(err)

err = source.Read(nil)
assert.True(strings.HasPrefix(err.Error(), `Failed to get SQS queue URL`))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If err is nil this will panic - see the rest of the tests in the project on (on master at least) for pattern that avoids this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

}
*/

// TODO: When we address https://github.com/snowplow-devops/stream-replicator/issues/151, this test will need to change.
func TestSQSSource_ReadFailure(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions pkg/target/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ func newEventHubTarget(cfg *EventHubConfig) (*EventHubTarget, error) {
// If none is specified, it will retry indefinitely until the context times out, which hides the actual error message
// To avoid obscuring errors, contextTimeoutInSeconds should be configured to ensure all retries may be completed before its expiry

// get the runtime information of the event hub in order to check the connection
_, err = hub.GetRuntimeInformation(context.Background())
if err != nil {
return nil, errors.Errorf("Error initialising EventHub client: could not reach Event Hub: %v", err)
}

return newEventHubTargetWithInterfaces(hub, cfg), err
}

Expand Down
44 changes: 11 additions & 33 deletions pkg/target/eventhub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,32 +364,6 @@ func TestWriteFailure(t *testing.T) {
assert.Nil(twres.Invalid)
}

// TestNewEventHubTarget_KeyValue tests that we can initialise a client with key value credentials.
func TestNewEventHubTarget_KeyValue(t *testing.T) {
assert := assert.New(t)

// Test that we can initialise a client with Key and Value
t.Setenv("EVENTHUB_KEY_NAME", "fake")
t.Setenv("EVENTHUB_KEY_VALUE", "fake")

tgt, err := newEventHubTarget(&cfg)
assert.Nil(err)
assert.NotNil(tgt)
}

// TestNewEventHubTarget_ConnString tests that we can initialise a client with connection string credentials.
func TestNewEventHubTarget_ConnString(t *testing.T) {
assert := assert.New(t)

// Test that we can initialise a client with Connection String

t.Setenv("EVENTHUB_CONNECTION_STRING", "Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=fake;SharedAccessKey=fake")

tgt, err := newEventHubTarget(&cfg)
assert.Nil(err)
assert.NotNil(tgt)
}

// TestNewEventHubTarget_CredentialsNotFound tests that we fail on startup when we're not provided with appropriate credential values.
func TestNewEventHubTarget_CredentialsNotFound(t *testing.T) {
assert := assert.New(t)
Expand All @@ -402,18 +376,22 @@ func TestNewEventHubTarget_CredentialsNotFound(t *testing.T) {
assert.Nil(tgt)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test was removed as the same behaviour is tested in the unit test above it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't look like the same behaviour to me...

There are two ways to authenticate against EH - connection string or key value. In the scenario where no valid combination of the required env vars for one of the two methods is present, the client doesn't provide a good debugging experience - so we manually check this in the code. The existing unit tests are for that check.

If the correct env vars are provided, however, we can still fail to connect to the client - it might be unavailable, the credentials provided might be incorrect, etc. So another test should be needed to check this.

Copy link
Contributor Author

@TiganeteaRobert TiganeteaRobert Aug 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m having a hard time understanding the issue here, it seems to me that the test that I deleted was not even correct after all. It sets env vars for EVENTHUB_KEY_NAME and EVENTHUB_KEY_VALUE and expects a No valid combination of authentication Env vars found error, which would not be the case. That error would appear if no env vars have been set, either for the connection string or the eventhub key. This possibility is tested above in TestNewEventHubTarget_CredentialsNotFound.

I might be missing something here so please if you could provide a bit more context on this, or even book a quick chat about it it would be really helpful.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking a second look, you're right, the different possible failures are indeed accounted for. I think I just looked in the wrong place and confused myself - these tests are indeed fine.

// NewEventHubTarget should fail if we can't reach EventHub, commented out this test until we look into https://github.com/snowplow-devops/stream-replicator/issues/151
// Note that when we do so, the above tests will need to be changed to use some kind of mock
/*
func TestNewEventHubTarget_Failure(t *testing.T) {
func TestNewEventHubTarget_ConnStringErr(t *testing.T) {
assert := assert.New(t)
t.Setenv("EVENTHUB_CONNECTION_STRING", "Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=fake;SharedAccessKey=fake")

tgt, err := newEventHubTarget(&cfg)
assert.EqualError(err, `Error initialising EventHub client: could not reach Event Hub: dial tcp: lookup test.servicebus.windows.net: no such host`)
assert.Nil(tgt)
}

func TestNewEventHubTarget_ConnVarsErr(t *testing.T) {
assert := assert.New(t)

// Test that we can initialise a client with Key and Value
t.Setenv("EVENTHUB_KEY_NAME", "fake")
t.Setenv("EVENTHUB_KEY_VALUE", "fake")

tgt, err := newEventHubTarget(&cfg)
assert.Equal("Error initialising EventHub client: No valid combination of authentication Env vars found. https://pkg.go.dev/github.com/Azure/azure-event-hubs-go#NewHubWithNamespaceNameAndEnvironment", err.Error())
assert.EqualError(err, `Error initialising EventHub client: could not reach Event Hub: dial tcp: lookup test.servicebus.windows.net: no such host`)
assert.Nil(tgt)
}
*/
21 changes: 17 additions & 4 deletions pkg/target/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,24 @@ func newHTTPTarget(httpURL string, requestTimeout int, byteLimit int, contentTyp
transport.TLSClientConfig = tlsConfig
}

client := &http.Client{
Transport: transport,
Timeout: time.Duration(requestTimeout) * time.Second,
}

// send a HEAD request to the URL to check the connection
request, err := http.NewRequest("HEAD", httpURL, nil)
if err != nil {
return nil, errors.Wrap(err, `Error creating HEAD request`)
}
resp, err := client.Do(request)
if err != nil {
return nil, errors.Wrap(err, `Connection to host error`)
}
defer resp.Body.Close()

return &HTTPTarget{
client: &http.Client{
Transport: transport,
Timeout: time.Duration(requestTimeout) * time.Second,
},
client: client,
httpURL: httpURL,
byteLimit: byteLimit,
contentType: contentType,
Expand Down
54 changes: 13 additions & 41 deletions pkg/target/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,10 @@ func TestAddHeadersToRequest(t *testing.T) {
func TestNewHTTPTarget(t *testing.T) {
assert := assert.New(t)

httpTarget, err := newHTTPTarget("http://something", 5, 1048576, "application/json", "", "", "", "", "", "", true)
httpTarget, err := newHTTPTarget("http://wrong-url-12345678.com", 5, 1048576, "application/json", "", "", "", "", "", "", true)

assert.Nil(err)
assert.NotNil(httpTarget)
assert.EqualError(err, `Connection to host error: Head "http://wrong-url-12345678.com": dial tcp: lookup wrong-url-12345678.com: no such host`)
assert.Nil(httpTarget)

failedHTTPTarget, err1 := newHTTPTarget("something", 5, 1048576, "application/json", "", "", "", "", "", "", true)

Expand All @@ -145,6 +145,8 @@ func TestHttpWrite_Simple(t *testing.T) {

var results [][]byte
wg := sync.WaitGroup{}
// add to waitGroup for connection check HEAD request
wg.Add(1)
server := createTestServer(&results, &wg)
defer server.Close()

Expand All @@ -166,8 +168,8 @@ func TestHttpWrite_Simple(t *testing.T) {

assert.Nil(err1)
assert.Equal(501, len(writeResult.Sent))
assert.Equal(501, len(results))
for _, result := range results {
assert.Equal(502, len(results))
for _, result := range results[1:] {
assert.Equal("Hello Server!!", string(result))
}

Expand All @@ -179,6 +181,7 @@ func TestHttpWrite_Concurrent(t *testing.T) {

var results [][]byte
wg := sync.WaitGroup{}
wg.Add(1)
server := createTestServer(&results, &wg)
defer server.Close()

Expand Down Expand Up @@ -209,51 +212,20 @@ func TestHttpWrite_Concurrent(t *testing.T) {

wg.Wait()

assert.Equal(10, len(results))
for _, result := range results {
assert.Equal(11, len(results))
for _, result := range results[1:] {
assert.Equal("Hello Server!!", string(result))
}

assert.Equal(int64(10), ackOps)
}

func TestHttpWrite_Failure(t *testing.T) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test was intended for a separate case to connection error - providing a bad path was just an easy way to achieve it.

It's to test the behaviour of stream replicator when requests fail.

Rather than removing a test for this case, we should amend the test to fail some other way.

For example, a malformed request body would achieve it. Or, we could add a case to the http server to respond to a given path with a success for the HEAD request, but a failure for all other requests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great idea with the path, I've added an exception in the http client to look for the /error path and return a 500 error if the request does not have the HEAD method. This allows us to test that the HEAD connection test can be successful but other requests can fail and test the behaviour.

assert := assert.New(t)

var results [][]byte
wg := sync.WaitGroup{}
server := createTestServer(&results, &wg)
defer server.Close()

target, err := newHTTPTarget("http://NonexistentEndpoint", 5, 1048576, "application/json", "", "", "", "", "", "", true)
if err != nil {
t.Fatal(err)
}

var ackOps int64
ackFunc := func() {
atomic.AddInt64(&ackOps, 1)
}

messages := testutil.GetTestMessages(10, "Hello Server!!", ackFunc)

writeResult, err1 := target.Write(messages)

assert.NotNil(err1)
if err1 != nil {
assert.Regexp("Error sending http request: 10 errors occurred:.*", err1.Error())
}

assert.Equal(10, len(writeResult.Failed))
assert.Nil(writeResult.Sent)
assert.Nil(writeResult.Oversized)
}

func TestHttpWrite_Oversized(t *testing.T) {
assert := assert.New(t)

var results [][]byte
wg := sync.WaitGroup{}
wg.Add(1)
server := createTestServer(&results, &wg)
defer server.Close()

Expand All @@ -278,8 +250,8 @@ func TestHttpWrite_Oversized(t *testing.T) {
assert.Nil(err1)
assert.Equal(10, len(writeResult.Sent))
assert.Equal(1, len(writeResult.Oversized))
assert.Equal(10, len(results))
for _, result := range results {
assert.Equal(11, len(results))
for _, result := range results[1:] {
assert.Equal("Hello Server!!", string(result))
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/target/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ func newPubSubTarget(projectID string, topicName string) (*PubSubTarget, error)
return nil, errors.Wrap(err, "Failed to create PubSub client")
}

sub := client.TopicInProject(topicName, projectID)
exists, err := sub.Exists(ctx)
if err != nil {
return nil, errors.Wrap(err, "Connection to PubSub failed")
}
if !exists {
return nil, errors.New("Connection to PubSub failed, topic does not exist")
}

return &PubSubTarget{
projectID: projectID,
client: client,
Expand Down
41 changes: 32 additions & 9 deletions pkg/target/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,30 @@ func TestPubSubTarget_WriteSuccessWithMocks(t *testing.T) {
assert.Equal(int64(10), ackOps)
}

// TestPubSubTarget_ConnCheckErrWithMocks unit tests the Pubsub target initialization with connection error
func TestPubSubTarget_ConnCheckErrWithMocks(t *testing.T) {
assert := assert.New(t)
srv, conn := testutil.InitMockPubsubServer(8563, nil, t)
srv.Close()
conn.Close()

pubsubTarget, err := newPubSubTarget(`project-test`, `test-topic-wrong`)
assert.Nil(pubsubTarget)
assert.EqualError(err, `Connection to PubSub failed: context deadline exceeded`)
}

// TestPubSubTarget_TopicFailWithMocks unit tests the Pubsub target initialization with wrong topic name
func TestPubSubTarget_TopicFailWithMocks(t *testing.T) {
assert := assert.New(t)
srv, conn := testutil.InitMockPubsubServer(8563, nil, t)
defer srv.Close()
defer conn.Close()

pubsubTarget, err := newPubSubTarget(`project-test`, `test-topic-wrong`)
assert.Nil(pubsubTarget)
assert.EqualError(err, `Connection to PubSub failed, topic does not exist`)
}

// TestPubSubTarget_WriteFailureWithMocks unit tests the unhappy path for PubSub target
func TestPubSubTarget_WriteFailureWithMocks(t *testing.T) {
assert := assert.New(t)
Expand Down Expand Up @@ -262,17 +286,16 @@ func TestNewPubSubTarget_Success(t *testing.T) {
assert.IsType(PubSubTarget{}, *pubsubTarget)
}

// TestnewPubSubTarget_Failure tests that we fail early when we cannot reach pubsub
// Commented out as this behaviour is not currently instrumented.
// This test serves to illustrate the desired behaviour for this issue: https://github.com/snowplow-devops/stream-replicator/issues/151
/*
func TestnewPubSubTarget_Failure(t *testing.T) {
// TestNewPubSubTarget_Failure tests that we fail early when we cannot reach pubsub
func TestNewPubSubTarget_Failure(t *testing.T) {
assert := assert.New(t)

pubsubTarget, err := newPubSubTarget(`nonexistent-project`, `nonexistent-topic`)
srv, conn := testutil.InitMockPubsubServer(8563, nil, t)
defer srv.Close()
defer conn.Close()

// TODO: Test for the actual error we expect, when we have instrumented failing fast
assert.NotNil(err)
pubsubTarget, err := newPubSubTarget(`project-test`, `test-topic-wrong`)

assert.EqualError(err, `Connection to PubSub failed, topic does not exist`)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Damn, wish I knew about EqualError before I spent ages wrapping stuff across the project in if statements 🤦

assert.Nil(pubsubTarget)
}
*/