From b5f0154d81c75e01042d652813663cd680888238 Mon Sep 17 00:00:00 2001 From: TiganeteaRobert Date: Wed, 3 Aug 2022 16:10:57 +0300 Subject: [PATCH] Fail Stream Replicator on startup if source or target isn't reachable (closes #182) --- pkg/source/kinesis/kinesis_source.go | 8 +++- pkg/source/kinesis/kinesis_source_test.go | 33 ++------------- pkg/source/pubsub/pubsub_source.go | 9 ++++ pkg/source/pubsub/pubsub_source_test.go | 35 ++++++++++++++-- pkg/source/sqs/sqs_source_test.go | 27 +++++++++--- pkg/target/eventhub.go | 6 +++ pkg/target/eventhub_test.go | 47 ++++++++------------- pkg/target/http.go | 21 ++++++++-- pkg/target/http_test.go | 50 ++++++++++++++--------- pkg/target/kafka_test.go | 18 ++++++++ pkg/target/kinesis_test.go | 13 ++++++ pkg/target/pubsub.go | 9 ++++ pkg/target/pubsub_test.go | 41 +++++++++++++++---- pkg/target/sqs_test.go | 9 ++++ 14 files changed, 224 insertions(+), 102 deletions(-) diff --git a/pkg/source/kinesis/kinesis_source.go b/pkg/source/kinesis/kinesis_source.go index 70a76490..e8da5dcb 100644 --- a/pkg/source/kinesis/kinesis_source.go +++ b/pkg/source/kinesis/kinesis_source.go @@ -157,6 +157,12 @@ func newKinesisSourceWithInterfaces(kinesisClient kinesisiface.KinesisAPI, dynam // TODO: See if the client name can be reused to survive same node reboots name := uuid.NewV4().String() + // test the connection to kinesis by trying to make an API call + _, err := kinesisClient.DescribeStream(&kinesis.DescribeStreamInput{StreamName: &streamName}) + if err != nil { + return nil, err + } + k, err := kinsumer.NewWithInterfaces(kinesisClient, dynamodbClient, streamName, appName, name, config) if err != nil { return nil, errors.Wrap(err, "Failed to create Kinsumer client") @@ -243,7 +249,7 @@ func (ks *kinesisSource) Read(sf *sourceiface.SourceFunctions) error { case <-time.After(10 * time.Second): // Append errors and crash multierror.Append(kinesisPullErr, errors.Errorf("wg.Wait() took too long, forcing app close.")) - ks.log.WithFields(log.Fields{"error": err}).Fatal(err) + ks.log.WithFields(log.Fields{"error": kinesisPullErr}).Fatal(kinesisPullErr) } // Return kinesisPullErr if we have one diff --git a/pkg/source/kinesis/kinesis_source_test.go b/pkg/source/kinesis/kinesis_source_test.go index be6e08da..aaf22ad0 100644 --- a/pkg/source/kinesis/kinesis_source_test.go +++ b/pkg/source/kinesis/kinesis_source_test.go @@ -65,9 +65,8 @@ func TestNewKinesisSourceWithInterfaces_Success(t *testing.T) { assert.Nil(err) } -// newKinesisSourceWithInterfaces should fail if we can't reach Kinesis and DDB, commented out this test until we look into https://github.com/snowplow-devops/stream-replicator/issues/151 -/* -func TestNewKinesisSourceWithInterfaces_Failure(t *testing.T) { +// TestNewKinesisSourceWithInterfaces_ConnectionCheck tests that the Kinesis source fails on start-up if the connection to Kinesis fails +func TestNewKinesisSourceWithInterfaces_ConnectionCheck(t *testing.T) { // Unlike the success test, we don't require anything to exist for this one assert := assert.New(t) @@ -75,34 +74,10 @@ func TestNewKinesisSourceWithInterfaces_Failure(t *testing.T) { kinesisClient := testutil.GetAWSLocalstackKinesisClient() dynamodbClient := testutil.GetAWSLocalstackDynamoDBClient() - source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, "nonexistent-stream", "test", nil) + _, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, "nonexistent-stream", "test", nil) - assert.Nil(&kinesisSource{}, source) - assert.NotNil(err) - -} -*/ - -// TODO: When we address https://github.com/snowplow-devops/stream-replicator/issues/151, this test will need to change. -func TestKinesisSource_ReadFailure_NoResources(t *testing.T) { - if testing.Short() { - t.Skip("skipping integration test") - } - - assert := assert.New(t) - - kinesisClient := testutil.GetAWSLocalstackKinesisClient() - dynamodbClient := testutil.GetAWSLocalstackDynamoDBClient() - - source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 1, testutil.AWSLocalstackRegion, "not-exists", "fake-name", nil) - assert.Nil(err) - assert.NotNil(source) - assert.Equal("arn:aws:kinesis:us-east-1:00000000000:stream/not-exists", source.GetID()) - - err = source.Read(nil) - assert.NotNil(err) if err != nil { - assert.Equal("Failed to start Kinsumer client: error describing table fake-name_checkpoints: ResourceNotFoundException: Cannot do operations on a non-existent table", err.Error()) + assert.Equal("ResourceNotFoundException: Stream nonexistent-stream under account 000000000000 not found.", err.Error()) } } diff --git a/pkg/source/pubsub/pubsub_source.go b/pkg/source/pubsub/pubsub_source.go index acb10626..6b2f493b 100644 --- a/pkg/source/pubsub/pubsub_source.go +++ b/pkg/source/pubsub/pubsub_source.go @@ -96,6 +96,15 @@ func newPubSubSource(concurrentWrites int, projectID string, subscriptionID stri return nil, errors.Wrap(err, "Failed to create PubSub client") } + sub := client.SubscriptionInProject(subscriptionID, projectID) + exists, err := sub.Exists(ctx) + if err != nil { + return nil, errors.Wrap(err, "Connection to PubSub failed") + } + if !exists { + return nil, errors.New("Connection to PubSub failed, subscription does not exist") + } + return &pubSubSource{ projectID: projectID, client: client, diff --git a/pkg/source/pubsub/pubsub_source_test.go b/pkg/source/pubsub/pubsub_source_test.go index 8de38102..4cd020db 100644 --- a/pkg/source/pubsub/pubsub_source_test.go +++ b/pkg/source/pubsub/pubsub_source_test.go @@ -64,20 +64,22 @@ func TestPubSubSource_ReadAndReturnSuccessIntegration(t *testing.T) { } } -// newPubSubSource_Failure should fail if we can't reach PubSub, commented out this test until we look into https://github.com/snowplow-devops/stream-replicator/issues/151 -/* +// newPubSubSource_Failure should fail if we can't reach PubSub func TestNewPubSubSource_Failure(t *testing.T) { if testing.Short() { t.Skip("skipping integration test") } assert := assert.New(t) + srv, _ := testutil.InitMockPubsubServer(8010, nil, t) + defer srv.Close() + pubsubSource, err := newPubSubSource(10, "nonexistent-project", "nonexistent-subscription") assert.NotNil(err) assert.Nil(pubsubSource) // This should return an error when we can't connect, rather than proceeding to the Write() function before we hit a problem. + assert.EqualError(err, `Connection to PubSub failed, subscription does not exist`) } -*/ // TestNewPubSubSource_Success tests the typical case of creating a new pubsub source. func TestNewPubSubSource_Success(t *testing.T) { @@ -86,7 +88,8 @@ func TestNewPubSubSource_Success(t *testing.T) { } assert := assert.New(t) - testutil.InitMockPubsubServer(8010, nil, t) + srv, _ := testutil.InitMockPubsubServer(8010, nil, t) + defer srv.Close() pubsubSource, err := newPubSubSource(10, "project-test", "test-sub") assert.Nil(err) @@ -132,6 +135,30 @@ func TestPubSubSource_ReadAndReturnSuccessWithMock(t *testing.T) { assert.Equal(expected, msgDatas) } +// TestPubSubSource_ConnCheckErrWithMocks unit tests PubSub source with connection error +func TestPubSubSource_ConnCheckErrWithMocks(t *testing.T) { + assert := assert.New(t) + srv, conn := testutil.InitMockPubsubServer(8563, nil, t) + srv.Close() + conn.Close() + + pubsubSource, err := newPubSubSource(10, "project-test", "test-sub-wrong") + assert.Nil(pubsubSource) + assert.EqualError(err, `Connection to PubSub failed: context deadline exceeded`) +} + +// TestPubSubSource_SubFailWithMocks unit tests PubSub source initiation with wrong sub name +func TestPubSubSource_SubFailWithMocks(t *testing.T) { + assert := assert.New(t) + srv, conn := testutil.InitMockPubsubServer(8563, nil, t) + defer srv.Close() + defer conn.Close() + + pubsubSource, err := newPubSubSource(10, "project-test", "test-sub-wrong") + assert.Nil(pubsubSource) + assert.EqualError(err, `Connection to PubSub failed, subscription does not exist`) +} + // TestPubSubSource_ReadAndReturnSuccessWithMock_DelayedAcks tests the behaviour of pubsub source when some messages take longer to ack than others func TestPubSubSource_ReadAndReturnSuccessWithMock_DelayedAcks(t *testing.T) { assert := assert.New(t) diff --git a/pkg/source/sqs/sqs_source_test.go b/pkg/source/sqs/sqs_source_test.go index 84aa4b77..22a6407f 100644 --- a/pkg/source/sqs/sqs_source_test.go +++ b/pkg/source/sqs/sqs_source_test.go @@ -30,6 +30,20 @@ func TestMain(m *testing.M) { os.Exit(exitVal) } +// TestNewSqsSource_ConnectionCheck tests that the SQS source fails on start-up if the connection to SQS fails +func TestNewSqsSource_ConnectionCheck(t *testing.T) { + assert := assert.New(t) + + target, err := configFunction(&configuration{ + QueueName: "not-exists", + Region: testutil.AWSLocalstackRegion, + RoleARN: `arn:aws:sqs:us-east-1:00000000000:not-exists`, + ConcurrentWrites: 15, + }) + assert.Nil(target) + assert.EqualError(err, "NoCredentialProviders: no valid providers in chain. Deprecated.\n\tFor verbose messaging see aws.Config.CredentialsChainVerboseErrors") +} + // func newSQSSourceWithInterfaces(client sqsiface.SQSAPI, awsAccountID string, concurrentWrites int, region string, queueName string) (*sqsSource, error) { func TestNewSQSSourceWithInterfaces_Success(t *testing.T) { if testing.Short() { @@ -51,8 +65,7 @@ func TestNewSQSSourceWithInterfaces_Success(t *testing.T) { assert.Nil(err) } -// newSQSSourceWithInterfaces should fail if we can't reach SQS, commented out this test until we look into https://github.com/snowplow-devops/stream-replicator/issues/151 -/* +// newSQSSourceWithInterfaces should fail if we can't reach SQS func TestNewSQSSourceWithInterfaces_Failure(t *testing.T) { // Unlike the success test, we don't require anything to exist for this one assert := assert.New(t) @@ -61,10 +74,14 @@ func TestNewSQSSourceWithInterfaces_Failure(t *testing.T) { source, err := newSQSSourceWithInterfaces(client, "00000000000", 10, testutil.AWSLocalstackRegion, "nonexistent-queue") - assert.Nil(source) - assert.NotNil(err) + assert.NotNil(source) + assert.Nil(err) + + err = source.Read(nil) + if err != nil { + assert.Equal(err.Error(), "Failed to get SQS queue URL: AWS.SimpleQueueService.NonExistentQueue: AWS.SimpleQueueService.NonExistentQueue; see the SQS docs.\n\tstatus code: 400, request id: 00000000-0000-0000-0000-000000000000") + } } -*/ // TODO: When we address https://github.com/snowplow-devops/stream-replicator/issues/151, this test will need to change. func TestSQSSource_ReadFailure(t *testing.T) { diff --git a/pkg/target/eventhub.go b/pkg/target/eventhub.go index 62a31abf..cd576cf2 100644 --- a/pkg/target/eventhub.go +++ b/pkg/target/eventhub.go @@ -96,6 +96,12 @@ func newEventHubTarget(cfg *EventHubConfig) (*EventHubTarget, error) { // If none is specified, it will retry indefinitely until the context times out, which hides the actual error message // To avoid obscuring errors, contextTimeoutInSeconds should be configured to ensure all retries may be completed before its expiry + // get the runtime information of the event hub in order to check the connection + _, err = hub.GetRuntimeInformation(context.Background()) + if err != nil { + return nil, errors.Errorf("Error initialising EventHub client: could not reach Event Hub: %v", err) + } + return newEventHubTargetWithInterfaces(hub, cfg), err } diff --git a/pkg/target/eventhub_test.go b/pkg/target/eventhub_test.go index 611c5c18..121d294e 100644 --- a/pkg/target/eventhub_test.go +++ b/pkg/target/eventhub_test.go @@ -364,56 +364,43 @@ func TestWriteFailure(t *testing.T) { assert.Nil(twres.Invalid) } -// TestNewEventHubTarget_KeyValue tests that we can initialise a client with key value credentials. -func TestNewEventHubTarget_KeyValue(t *testing.T) { +// TestNewEventHubTarget_CredentialsNotFound tests that we fail on startup when we're not provided with appropriate credential values. +func TestNewEventHubTarget_CredentialsNotFound(t *testing.T) { assert := assert.New(t) - // Test that we can initialise a client with Key and Value - t.Setenv("EVENTHUB_KEY_NAME", "fake") - t.Setenv("EVENTHUB_KEY_VALUE", "fake") - tgt, err := newEventHubTarget(&cfg) - assert.Nil(err) - assert.NotNil(tgt) + assert.NotNil(err) + if err != nil { + assert.Equal("Error initialising EventHub client: No valid combination of authentication Env vars found. https://pkg.go.dev/github.com/Azure/azure-event-hubs-go#NewHubWithNamespaceNameAndEnvironment", err.Error()) + } + assert.Nil(tgt) } -// TestNewEventHubTarget_ConnString tests that we can initialise a client with connection string credentials. -func TestNewEventHubTarget_ConnString(t *testing.T) { +func TestNewEventHubTarget_ConnStringErr(t *testing.T) { assert := assert.New(t) - - // Test that we can initialise a client with Connection String - t.Setenv("EVENTHUB_CONNECTION_STRING", "Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=fake;SharedAccessKey=fake") tgt, err := newEventHubTarget(&cfg) - assert.Nil(err) - assert.NotNil(tgt) + assert.EqualError(err, `Error initialising EventHub client: could not reach Event Hub: dial tcp: lookup test.servicebus.windows.net: no such host`) + assert.Nil(tgt) } -// TestNewEventHubTarget_CredentialsNotFound tests that we fail on startup when we're not provided with appropriate credential values. -func TestNewEventHubTarget_CredentialsNotFound(t *testing.T) { +func TestNewEventHubTarget_ConnVarsErr(t *testing.T) { assert := assert.New(t) + t.Setenv("EVENTHUB_KEY_NAME", "fake") + t.Setenv("EVENTHUB_KEY_VALUE", "fake") + tgt, err := newEventHubTarget(&cfg) - assert.NotNil(err) - if err != nil { - assert.Equal("Error initialising EventHub client: No valid combination of authentication Env vars found. https://pkg.go.dev/github.com/Azure/azure-event-hubs-go#NewHubWithNamespaceNameAndEnvironment", err.Error()) - } + assert.EqualError(err, `Error initialising EventHub client: could not reach Event Hub: dial tcp: lookup test.servicebus.windows.net: no such host`) assert.Nil(tgt) } -// NewEventHubTarget should fail if we can't reach EventHub, commented out this test until we look into https://github.com/snowplow-devops/stream-replicator/issues/151 -// Note that when we do so, the above tests will need to be changed to use some kind of mock -/* -func TestNewEventHubTarget_Failure(t *testing.T) { +// NewEventHubTarget should fail if we can't reach EventHub +func TestNewEventHubTarget_NoVars(t *testing.T) { assert := assert.New(t) - // Test that we can initialise a client with Key and Value - t.Setenv("EVENTHUB_KEY_NAME", "fake") - t.Setenv("EVENTHUB_KEY_VALUE", "fake") - tgt, err := newEventHubTarget(&cfg) assert.Equal("Error initialising EventHub client: No valid combination of authentication Env vars found. https://pkg.go.dev/github.com/Azure/azure-event-hubs-go#NewHubWithNamespaceNameAndEnvironment", err.Error()) assert.Nil(tgt) } -*/ diff --git a/pkg/target/http.go b/pkg/target/http.go index 28f8d9a3..96cf1b7d 100644 --- a/pkg/target/http.go +++ b/pkg/target/http.go @@ -108,11 +108,24 @@ func newHTTPTarget(httpURL string, requestTimeout int, byteLimit int, contentTyp transport.TLSClientConfig = tlsConfig } + client := &http.Client{ + Transport: transport, + Timeout: time.Duration(requestTimeout) * time.Second, + } + + // send a HEAD request to the URL to check the connection + request, err := http.NewRequest("HEAD", httpURL, nil) + if err != nil { + return nil, errors.Wrap(err, `Error creating HEAD request`) + } + resp, err := client.Do(request) + if err != nil { + return nil, errors.Wrap(err, `Connection to host error`) + } + defer resp.Body.Close() + return &HTTPTarget{ - client: &http.Client{ - Transport: transport, - Timeout: time.Duration(requestTimeout) * time.Second, - }, + client: client, httpURL: httpURL, byteLimit: byteLimit, contentType: contentType, diff --git a/pkg/target/http_test.go b/pkg/target/http_test.go index 7ab3eb86..a15e7d3f 100644 --- a/pkg/target/http_test.go +++ b/pkg/target/http_test.go @@ -32,6 +32,13 @@ func createTestServer(results *[][]byte, waitgroup *sync.WaitGroup) *httptest.Se panic(err) } mutex.Lock() + if req.URL.Path == `/error` { + if req.Method == http.MethodHead { + *results = append(*results, data) + } else { + w.WriteHeader(500) + } + } *results = append(*results, data) mutex.Unlock() defer waitgroup.Done() @@ -119,10 +126,10 @@ func TestAddHeadersToRequest(t *testing.T) { func TestNewHTTPTarget(t *testing.T) { assert := assert.New(t) - httpTarget, err := newHTTPTarget("http://something", 5, 1048576, "application/json", "", "", "", "", "", "", true) + httpTarget, err := newHTTPTarget("http://wrong-url-12345678.com", 5, 1048576, "application/json", "", "", "", "", "", "", true) - assert.Nil(err) - assert.NotNil(httpTarget) + assert.EqualError(err, `Connection to host error: Head "http://wrong-url-12345678.com": dial tcp: lookup wrong-url-12345678.com: no such host`) + assert.Nil(httpTarget) failedHTTPTarget, err1 := newHTTPTarget("something", 5, 1048576, "application/json", "", "", "", "", "", "", true) @@ -145,6 +152,8 @@ func TestHttpWrite_Simple(t *testing.T) { var results [][]byte wg := sync.WaitGroup{} + // add to waitGroup for connection check HEAD request + wg.Add(1) server := createTestServer(&results, &wg) defer server.Close() @@ -166,8 +175,8 @@ func TestHttpWrite_Simple(t *testing.T) { assert.Nil(err1) assert.Equal(501, len(writeResult.Sent)) - assert.Equal(501, len(results)) - for _, result := range results { + assert.Equal(502, len(results)) + for _, result := range results[1:] { assert.Equal("Hello Server!!", string(result)) } @@ -179,6 +188,7 @@ func TestHttpWrite_Concurrent(t *testing.T) { var results [][]byte wg := sync.WaitGroup{} + wg.Add(1) server := createTestServer(&results, &wg) defer server.Close() @@ -209,23 +219,25 @@ func TestHttpWrite_Concurrent(t *testing.T) { wg.Wait() - assert.Equal(10, len(results)) - for _, result := range results { + assert.Equal(11, len(results)) + for _, result := range results[1:] { assert.Equal("Hello Server!!", string(result)) } assert.Equal(int64(10), ackOps) } -func TestHttpWrite_Failure(t *testing.T) { +func TestHttpWrite_RequestFailure(t *testing.T) { assert := assert.New(t) var results [][]byte wg := sync.WaitGroup{} + // add to waitGroup for connection check HEAD request + wg.Add(1) server := createTestServer(&results, &wg) defer server.Close() - target, err := newHTTPTarget("http://NonexistentEndpoint", 5, 1048576, "application/json", "", "", "", "", "", "", true) + target, err := newHTTPTarget(server.URL+`/error`, 5, 1048576, "application/json", "", "", "", "", "", "", true) if err != nil { t.Fatal(err) } @@ -235,18 +247,15 @@ func TestHttpWrite_Failure(t *testing.T) { atomic.AddInt64(&ackOps, 1) } - messages := testutil.GetTestMessages(10, "Hello Server!!", ackFunc) + messages := testutil.GetTestMessages(1, "Hello Server!!", ackFunc) + wg.Add(1) + _, err1 := target.Write(messages) - writeResult, err1 := target.Write(messages) + wg.Wait() - assert.NotNil(err1) - if err1 != nil { - assert.Regexp("Error sending http request: 10 errors occurred:.*", err1.Error()) - } + assert.EqualError(err1, "Error sending http request: 1 error occurred:\n\t* 500 Internal Server Error: \n\n") - assert.Equal(10, len(writeResult.Failed)) - assert.Nil(writeResult.Sent) - assert.Nil(writeResult.Oversized) + assert.Equal(int64(0), ackOps) } func TestHttpWrite_Oversized(t *testing.T) { @@ -254,6 +263,7 @@ func TestHttpWrite_Oversized(t *testing.T) { var results [][]byte wg := sync.WaitGroup{} + wg.Add(1) server := createTestServer(&results, &wg) defer server.Close() @@ -278,8 +288,8 @@ func TestHttpWrite_Oversized(t *testing.T) { assert.Nil(err1) assert.Equal(10, len(writeResult.Sent)) assert.Equal(1, len(writeResult.Oversized)) - assert.Equal(10, len(results)) - for _, result := range results { + assert.Equal(11, len(results)) + for _, result := range results[1:] { assert.Equal("Hello Server!!", string(result)) } diff --git a/pkg/target/kafka_test.go b/pkg/target/kafka_test.go index 9de02a2b..1b7a452c 100644 --- a/pkg/target/kafka_test.go +++ b/pkg/target/kafka_test.go @@ -7,6 +7,7 @@ package target import ( + "strings" "sync/atomic" "testing" @@ -62,6 +63,23 @@ func SetUpMockSyncProducer(t *testing.T) (*mocks.SyncProducer, *KafkaTarget) { } } +// TestNewKafkaTarget_Failure tests that the kafka target fails on start-up if the connection to Kafka fails +func TestNewKafkaTarget_Failure(t *testing.T) { + assert := assert.New(t) + + k, err := NewKafkaTarget(&KafkaConfig{ + Brokers: "test:8080", + TopicName: "test", + ByteLimit: 1000, + }) + + assert.Nil(k) + assert.NotNil(err) + if err != nil { + assert.True(strings.HasPrefix(err.Error(), `kafka: client has run out of available brokers to talk to: dial tcp`)) + } +} + func TestKafkaTarget_AsyncWriteFailure(t *testing.T) { assert := assert.New(t) diff --git a/pkg/target/kinesis_test.go b/pkg/target/kinesis_test.go index 5d7aee8f..78b5e6e9 100644 --- a/pkg/target/kinesis_test.go +++ b/pkg/target/kinesis_test.go @@ -15,6 +15,19 @@ import ( "github.com/snowplow-devops/stream-replicator/pkg/testutil" ) +// TestNewKinesisTarget_ConnectionCheck tests that the Kinesis target fails on start-up if the connection to Kinesis fails +func TestNewKinesisTarget_ConnectionCheck(t *testing.T) { + assert := assert.New(t) + + target, err := newKinesisTarget(testutil.AWSLocalstackRegion, "00000000000", "arn:aws:kinesis:us-east-1:00000000000:stream/not-exists") + assert.Nil(target) + assert.NotNil(err) + // check that there is an error, meaning that a connection attempt to AWS was made + if err != nil { + assert.Equal("NoCredentialProviders: no valid providers in chain. Deprecated.\n\tFor verbose messaging see aws.Config.CredentialsChainVerboseErrors", err.Error()) + } +} + func TestKinesisTarget_WriteFailure(t *testing.T) { if testing.Short() { t.Skip("skipping integration test") diff --git a/pkg/target/pubsub.go b/pkg/target/pubsub.go index 2af180ce..13ebdefd 100644 --- a/pkg/target/pubsub.go +++ b/pkg/target/pubsub.go @@ -57,6 +57,15 @@ func newPubSubTarget(projectID string, topicName string) (*PubSubTarget, error) return nil, errors.Wrap(err, "Failed to create PubSub client") } + sub := client.TopicInProject(topicName, projectID) + exists, err := sub.Exists(ctx) + if err != nil { + return nil, errors.Wrap(err, "Connection to PubSub failed") + } + if !exists { + return nil, errors.New("Connection to PubSub failed, topic does not exist") + } + return &PubSubTarget{ projectID: projectID, client: client, diff --git a/pkg/target/pubsub_test.go b/pkg/target/pubsub_test.go index 74ffeeaf..460d255b 100644 --- a/pkg/target/pubsub_test.go +++ b/pkg/target/pubsub_test.go @@ -150,6 +150,30 @@ func TestPubSubTarget_WriteSuccessWithMocks(t *testing.T) { assert.Equal(int64(10), ackOps) } +// TestPubSubTarget_ConnCheckErrWithMocks unit tests the Pubsub target initialization with connection error +func TestPubSubTarget_ConnCheckErrWithMocks(t *testing.T) { + assert := assert.New(t) + srv, conn := testutil.InitMockPubsubServer(8563, nil, t) + srv.Close() + conn.Close() + + pubsubTarget, err := newPubSubTarget(`project-test`, `test-topic-wrong`) + assert.Nil(pubsubTarget) + assert.EqualError(err, `Connection to PubSub failed: context deadline exceeded`) +} + +// TestPubSubTarget_TopicFailWithMocks unit tests the Pubsub target initialization with wrong topic name +func TestPubSubTarget_TopicFailWithMocks(t *testing.T) { + assert := assert.New(t) + srv, conn := testutil.InitMockPubsubServer(8563, nil, t) + defer srv.Close() + defer conn.Close() + + pubsubTarget, err := newPubSubTarget(`project-test`, `test-topic-wrong`) + assert.Nil(pubsubTarget) + assert.EqualError(err, `Connection to PubSub failed, topic does not exist`) +} + // TestPubSubTarget_WriteFailureWithMocks unit tests the unhappy path for PubSub target func TestPubSubTarget_WriteFailureWithMocks(t *testing.T) { assert := assert.New(t) @@ -262,17 +286,16 @@ func TestNewPubSubTarget_Success(t *testing.T) { assert.IsType(PubSubTarget{}, *pubsubTarget) } -// TestnewPubSubTarget_Failure tests that we fail early when we cannot reach pubsub -// Commented out as this behaviour is not currently instrumented. -// This test serves to illustrate the desired behaviour for this issue: https://github.com/snowplow-devops/stream-replicator/issues/151 -/* -func TestnewPubSubTarget_Failure(t *testing.T) { +// TestNewPubSubTarget_Failure tests that we fail early when we cannot reach pubsub +func TestNewPubSubTarget_Failure(t *testing.T) { assert := assert.New(t) - pubsubTarget, err := newPubSubTarget(`nonexistent-project`, `nonexistent-topic`) + srv, conn := testutil.InitMockPubsubServer(8563, nil, t) + defer srv.Close() + defer conn.Close() - // TODO: Test for the actual error we expect, when we have instrumented failing fast - assert.NotNil(err) + pubsubTarget, err := newPubSubTarget(`project-test`, `test-topic-wrong`) + + assert.EqualError(err, `Connection to PubSub failed, topic does not exist`) assert.Nil(pubsubTarget) } -*/ diff --git a/pkg/target/sqs_test.go b/pkg/target/sqs_test.go index 159c81d9..a9da46d0 100644 --- a/pkg/target/sqs_test.go +++ b/pkg/target/sqs_test.go @@ -15,6 +15,15 @@ import ( "github.com/snowplow-devops/stream-replicator/pkg/testutil" ) +// TestNewSqsTarget_ConnectionCheck tests that the SQS target fails on start-up if the connection to SQS fails +func TestNewSqsTarget_ConnectionCheck(t *testing.T) { + assert := assert.New(t) + + target, err := newSQSTarget(testutil.AWSLocalstackRegion, "not-exists", `arn:aws:sqs:us-east-1:00000000000:not-exists`) + assert.Nil(target) + assert.EqualError(err, "NoCredentialProviders: no valid providers in chain. Deprecated.\n\tFor verbose messaging see aws.Config.CredentialsChainVerboseErrors") +} + func TestSQSTarget_WriteFailure(t *testing.T) { if testing.Short() { t.Skip("skipping integration test")