Skip to content

Commit

Permalink
Fail Stream Replicator on startup if source or target isn't reachable (
Browse files Browse the repository at this point in the history
…closes #182)
  • Loading branch information
TiganeteaRobert committed Aug 23, 2022
1 parent 9b219a0 commit b5f0154
Show file tree
Hide file tree
Showing 14 changed files with 224 additions and 102 deletions.
8 changes: 7 additions & 1 deletion pkg/source/kinesis/kinesis_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ func newKinesisSourceWithInterfaces(kinesisClient kinesisiface.KinesisAPI, dynam
// TODO: See if the client name can be reused to survive same node reboots
name := uuid.NewV4().String()

// test the connection to kinesis by trying to make an API call
_, err := kinesisClient.DescribeStream(&kinesis.DescribeStreamInput{StreamName: &streamName})
if err != nil {
return nil, err
}

k, err := kinsumer.NewWithInterfaces(kinesisClient, dynamodbClient, streamName, appName, name, config)
if err != nil {
return nil, errors.Wrap(err, "Failed to create Kinsumer client")
Expand Down Expand Up @@ -243,7 +249,7 @@ func (ks *kinesisSource) Read(sf *sourceiface.SourceFunctions) error {
case <-time.After(10 * time.Second):
// Append errors and crash
multierror.Append(kinesisPullErr, errors.Errorf("wg.Wait() took too long, forcing app close."))
ks.log.WithFields(log.Fields{"error": err}).Fatal(err)
ks.log.WithFields(log.Fields{"error": kinesisPullErr}).Fatal(kinesisPullErr)
}

// Return kinesisPullErr if we have one
Expand Down
33 changes: 4 additions & 29 deletions pkg/source/kinesis/kinesis_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,44 +65,19 @@ func TestNewKinesisSourceWithInterfaces_Success(t *testing.T) {
assert.Nil(err)
}

// newKinesisSourceWithInterfaces should fail if we can't reach Kinesis and DDB, commented out this test until we look into https://github.com/snowplow-devops/stream-replicator/issues/151
/*
func TestNewKinesisSourceWithInterfaces_Failure(t *testing.T) {
// TestNewKinesisSourceWithInterfaces_ConnectionCheck tests that the Kinesis source fails on start-up if the connection to Kinesis fails
func TestNewKinesisSourceWithInterfaces_ConnectionCheck(t *testing.T) {
// Unlike the success test, we don't require anything to exist for this one
assert := assert.New(t)

// Set up localstack resources
kinesisClient := testutil.GetAWSLocalstackKinesisClient()
dynamodbClient := testutil.GetAWSLocalstackDynamoDBClient()

source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, "nonexistent-stream", "test", nil)
_, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, "nonexistent-stream", "test", nil)

assert.Nil(&kinesisSource{}, source)
assert.NotNil(err)
}
*/

// TODO: When we address https://github.com/snowplow-devops/stream-replicator/issues/151, this test will need to change.
func TestKinesisSource_ReadFailure_NoResources(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}

assert := assert.New(t)

kinesisClient := testutil.GetAWSLocalstackKinesisClient()
dynamodbClient := testutil.GetAWSLocalstackDynamoDBClient()

source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 1, testutil.AWSLocalstackRegion, "not-exists", "fake-name", nil)
assert.Nil(err)
assert.NotNil(source)
assert.Equal("arn:aws:kinesis:us-east-1:00000000000:stream/not-exists", source.GetID())

err = source.Read(nil)
assert.NotNil(err)
if err != nil {
assert.Equal("Failed to start Kinsumer client: error describing table fake-name_checkpoints: ResourceNotFoundException: Cannot do operations on a non-existent table", err.Error())
assert.Equal("ResourceNotFoundException: Stream nonexistent-stream under account 000000000000 not found.", err.Error())
}
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/source/pubsub/pubsub_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ func newPubSubSource(concurrentWrites int, projectID string, subscriptionID stri
return nil, errors.Wrap(err, "Failed to create PubSub client")
}

sub := client.SubscriptionInProject(subscriptionID, projectID)
exists, err := sub.Exists(ctx)
if err != nil {
return nil, errors.Wrap(err, "Connection to PubSub failed")
}
if !exists {
return nil, errors.New("Connection to PubSub failed, subscription does not exist")
}

return &pubSubSource{
projectID: projectID,
client: client,
Expand Down
35 changes: 31 additions & 4 deletions pkg/source/pubsub/pubsub_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,22 @@ func TestPubSubSource_ReadAndReturnSuccessIntegration(t *testing.T) {
}
}

// newPubSubSource_Failure should fail if we can't reach PubSub, commented out this test until we look into https://github.com/snowplow-devops/stream-replicator/issues/151
/*
// newPubSubSource_Failure should fail if we can't reach PubSub
func TestNewPubSubSource_Failure(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
assert := assert.New(t)

srv, _ := testutil.InitMockPubsubServer(8010, nil, t)
defer srv.Close()

pubsubSource, err := newPubSubSource(10, "nonexistent-project", "nonexistent-subscription")
assert.NotNil(err)
assert.Nil(pubsubSource)
// This should return an error when we can't connect, rather than proceeding to the Write() function before we hit a problem.
assert.EqualError(err, `Connection to PubSub failed, subscription does not exist`)
}
*/

// TestNewPubSubSource_Success tests the typical case of creating a new pubsub source.
func TestNewPubSubSource_Success(t *testing.T) {
Expand All @@ -86,7 +88,8 @@ func TestNewPubSubSource_Success(t *testing.T) {
}
assert := assert.New(t)

testutil.InitMockPubsubServer(8010, nil, t)
srv, _ := testutil.InitMockPubsubServer(8010, nil, t)
defer srv.Close()

pubsubSource, err := newPubSubSource(10, "project-test", "test-sub")
assert.Nil(err)
Expand Down Expand Up @@ -132,6 +135,30 @@ func TestPubSubSource_ReadAndReturnSuccessWithMock(t *testing.T) {
assert.Equal(expected, msgDatas)
}

// TestPubSubSource_ConnCheckErrWithMocks unit tests PubSub source with connection error
func TestPubSubSource_ConnCheckErrWithMocks(t *testing.T) {
assert := assert.New(t)
srv, conn := testutil.InitMockPubsubServer(8563, nil, t)
srv.Close()
conn.Close()

pubsubSource, err := newPubSubSource(10, "project-test", "test-sub-wrong")
assert.Nil(pubsubSource)
assert.EqualError(err, `Connection to PubSub failed: context deadline exceeded`)
}

// TestPubSubSource_SubFailWithMocks unit tests PubSub source initiation with wrong sub name
func TestPubSubSource_SubFailWithMocks(t *testing.T) {
assert := assert.New(t)
srv, conn := testutil.InitMockPubsubServer(8563, nil, t)
defer srv.Close()
defer conn.Close()

pubsubSource, err := newPubSubSource(10, "project-test", "test-sub-wrong")
assert.Nil(pubsubSource)
assert.EqualError(err, `Connection to PubSub failed, subscription does not exist`)
}

// TestPubSubSource_ReadAndReturnSuccessWithMock_DelayedAcks tests the behaviour of pubsub source when some messages take longer to ack than others
func TestPubSubSource_ReadAndReturnSuccessWithMock_DelayedAcks(t *testing.T) {
assert := assert.New(t)
Expand Down
27 changes: 22 additions & 5 deletions pkg/source/sqs/sqs_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,20 @@ func TestMain(m *testing.M) {
os.Exit(exitVal)
}

// TestNewSqsSource_ConnectionCheck tests that the SQS source fails on start-up if the connection to SQS fails
func TestNewSqsSource_ConnectionCheck(t *testing.T) {
assert := assert.New(t)

target, err := configFunction(&configuration{
QueueName: "not-exists",
Region: testutil.AWSLocalstackRegion,
RoleARN: `arn:aws:sqs:us-east-1:00000000000:not-exists`,
ConcurrentWrites: 15,
})
assert.Nil(target)
assert.EqualError(err, "NoCredentialProviders: no valid providers in chain. Deprecated.\n\tFor verbose messaging see aws.Config.CredentialsChainVerboseErrors")
}

// func newSQSSourceWithInterfaces(client sqsiface.SQSAPI, awsAccountID string, concurrentWrites int, region string, queueName string) (*sqsSource, error) {
func TestNewSQSSourceWithInterfaces_Success(t *testing.T) {
if testing.Short() {
Expand All @@ -51,8 +65,7 @@ func TestNewSQSSourceWithInterfaces_Success(t *testing.T) {
assert.Nil(err)
}

// newSQSSourceWithInterfaces should fail if we can't reach SQS, commented out this test until we look into https://github.com/snowplow-devops/stream-replicator/issues/151
/*
// newSQSSourceWithInterfaces should fail if we can't reach SQS
func TestNewSQSSourceWithInterfaces_Failure(t *testing.T) {
// Unlike the success test, we don't require anything to exist for this one
assert := assert.New(t)
Expand All @@ -61,10 +74,14 @@ func TestNewSQSSourceWithInterfaces_Failure(t *testing.T) {

source, err := newSQSSourceWithInterfaces(client, "00000000000", 10, testutil.AWSLocalstackRegion, "nonexistent-queue")

assert.Nil(source)
assert.NotNil(err)
assert.NotNil(source)
assert.Nil(err)

err = source.Read(nil)
if err != nil {
assert.Equal(err.Error(), "Failed to get SQS queue URL: AWS.SimpleQueueService.NonExistentQueue: AWS.SimpleQueueService.NonExistentQueue; see the SQS docs.\n\tstatus code: 400, request id: 00000000-0000-0000-0000-000000000000")
}
}
*/

// TODO: When we address https://github.com/snowplow-devops/stream-replicator/issues/151, this test will need to change.
func TestSQSSource_ReadFailure(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions pkg/target/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ func newEventHubTarget(cfg *EventHubConfig) (*EventHubTarget, error) {
// If none is specified, it will retry indefinitely until the context times out, which hides the actual error message
// To avoid obscuring errors, contextTimeoutInSeconds should be configured to ensure all retries may be completed before its expiry

// get the runtime information of the event hub in order to check the connection
_, err = hub.GetRuntimeInformation(context.Background())
if err != nil {
return nil, errors.Errorf("Error initialising EventHub client: could not reach Event Hub: %v", err)
}

return newEventHubTargetWithInterfaces(hub, cfg), err
}

Expand Down
47 changes: 17 additions & 30 deletions pkg/target/eventhub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,56 +364,43 @@ func TestWriteFailure(t *testing.T) {
assert.Nil(twres.Invalid)
}

// TestNewEventHubTarget_KeyValue tests that we can initialise a client with key value credentials.
func TestNewEventHubTarget_KeyValue(t *testing.T) {
// TestNewEventHubTarget_CredentialsNotFound tests that we fail on startup when we're not provided with appropriate credential values.
func TestNewEventHubTarget_CredentialsNotFound(t *testing.T) {
assert := assert.New(t)

// Test that we can initialise a client with Key and Value
t.Setenv("EVENTHUB_KEY_NAME", "fake")
t.Setenv("EVENTHUB_KEY_VALUE", "fake")

tgt, err := newEventHubTarget(&cfg)
assert.Nil(err)
assert.NotNil(tgt)
assert.NotNil(err)
if err != nil {
assert.Equal("Error initialising EventHub client: No valid combination of authentication Env vars found. https://pkg.go.dev/github.com/Azure/azure-event-hubs-go#NewHubWithNamespaceNameAndEnvironment", err.Error())
}
assert.Nil(tgt)
}

// TestNewEventHubTarget_ConnString tests that we can initialise a client with connection string credentials.
func TestNewEventHubTarget_ConnString(t *testing.T) {
func TestNewEventHubTarget_ConnStringErr(t *testing.T) {
assert := assert.New(t)

// Test that we can initialise a client with Connection String

t.Setenv("EVENTHUB_CONNECTION_STRING", "Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=fake;SharedAccessKey=fake")

tgt, err := newEventHubTarget(&cfg)
assert.Nil(err)
assert.NotNil(tgt)
assert.EqualError(err, `Error initialising EventHub client: could not reach Event Hub: dial tcp: lookup test.servicebus.windows.net: no such host`)
assert.Nil(tgt)
}

// TestNewEventHubTarget_CredentialsNotFound tests that we fail on startup when we're not provided with appropriate credential values.
func TestNewEventHubTarget_CredentialsNotFound(t *testing.T) {
func TestNewEventHubTarget_ConnVarsErr(t *testing.T) {
assert := assert.New(t)

t.Setenv("EVENTHUB_KEY_NAME", "fake")
t.Setenv("EVENTHUB_KEY_VALUE", "fake")

tgt, err := newEventHubTarget(&cfg)
assert.NotNil(err)
if err != nil {
assert.Equal("Error initialising EventHub client: No valid combination of authentication Env vars found. https://pkg.go.dev/github.com/Azure/azure-event-hubs-go#NewHubWithNamespaceNameAndEnvironment", err.Error())
}
assert.EqualError(err, `Error initialising EventHub client: could not reach Event Hub: dial tcp: lookup test.servicebus.windows.net: no such host`)
assert.Nil(tgt)
}

// NewEventHubTarget should fail if we can't reach EventHub, commented out this test until we look into https://github.com/snowplow-devops/stream-replicator/issues/151
// Note that when we do so, the above tests will need to be changed to use some kind of mock
/*
func TestNewEventHubTarget_Failure(t *testing.T) {
// NewEventHubTarget should fail if we can't reach EventHub
func TestNewEventHubTarget_NoVars(t *testing.T) {
assert := assert.New(t)
// Test that we can initialise a client with Key and Value
t.Setenv("EVENTHUB_KEY_NAME", "fake")
t.Setenv("EVENTHUB_KEY_VALUE", "fake")
tgt, err := newEventHubTarget(&cfg)
assert.Equal("Error initialising EventHub client: No valid combination of authentication Env vars found. https://pkg.go.dev/github.com/Azure/azure-event-hubs-go#NewHubWithNamespaceNameAndEnvironment", err.Error())
assert.Nil(tgt)
}
*/
21 changes: 17 additions & 4 deletions pkg/target/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,24 @@ func newHTTPTarget(httpURL string, requestTimeout int, byteLimit int, contentTyp
transport.TLSClientConfig = tlsConfig
}

client := &http.Client{
Transport: transport,
Timeout: time.Duration(requestTimeout) * time.Second,
}

// send a HEAD request to the URL to check the connection
request, err := http.NewRequest("HEAD", httpURL, nil)
if err != nil {
return nil, errors.Wrap(err, `Error creating HEAD request`)
}
resp, err := client.Do(request)
if err != nil {
return nil, errors.Wrap(err, `Connection to host error`)
}
defer resp.Body.Close()

return &HTTPTarget{
client: &http.Client{
Transport: transport,
Timeout: time.Duration(requestTimeout) * time.Second,
},
client: client,
httpURL: httpURL,
byteLimit: byteLimit,
contentType: contentType,
Expand Down
Loading

0 comments on commit b5f0154

Please sign in to comment.