diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index 79ad56aca..b1292eb9b 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -3,4 +3,4 @@ on: pull_request: jobs: ci: - uses: ThreeDotsLabs/watermill/.github/workflows/tests.yml@lint + uses: ThreeDotsLabs/watermill/.github/workflows/tests.yml@codeconv-pr diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 24577668a..b71a22dc6 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -43,3 +43,12 @@ jobs: - run: make test_stress if: ${{ inputs.stress-tests }} + codecov: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@master + - uses: actions/setup-go@v2 + with: + go-version: '^1.21.1' + - run: make test_codecov + - uses: codecov/codecov-action@v3 diff --git a/components/requestreply/backend_pubsub.go b/components/requestreply/backend_pubsub.go index d5352b9e2..d6fb90a5d 100644 --- a/components/requestreply/backend_pubsub.go +++ b/components/requestreply/backend_pubsub.go @@ -7,6 +7,7 @@ import ( "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" + "github.com/hashicorp/go-multierror" "github.com/pkg/errors" ) @@ -97,20 +98,22 @@ func (p *PubSubBackendConfig) setDefaults() { } func (p *PubSubBackendConfig) Validate() error { + var err error + if p.Publisher == nil { - return errors.New("publisher cannot be nil") + err = multierror.Append(err, errors.New("publisher cannot be nil")) } if p.SubscriberConstructor == nil { - return errors.New("subscriber constructor cannot be nil") + err = multierror.Append(err, errors.New("subscriber constructor cannot be nil")) } if p.GeneratePublishTopic == nil { - return errors.New("GeneratePublishTopic cannot be nil") + err = multierror.Append(err, errors.New("GeneratePublishTopic cannot be nil")) } if p.GenerateSubscribeTopic == nil { - return errors.New("GenerateSubscribeTopic cannot be nil") + err = multierror.Append(err, errors.New("GenerateSubscribeTopic cannot be nil")) } - return nil + return err } func (p PubSubBackend[Result]) ListenForNotifications( diff --git a/components/requestreply/requestreply_test.go b/components/requestreply/requestreply_test.go index 690ebd290..a323b172f 100644 --- a/components/requestreply/requestreply_test.go +++ b/components/requestreply/requestreply_test.go @@ -28,6 +28,7 @@ type TestServices[Result any] struct { CommandProcessor *cqrs.CommandProcessor RequestReplyBackend *requestreply.PubSubBackend[Result] + BackendConfig requestreply.PubSubBackendConfig } type TestServicesConfig struct { @@ -50,47 +51,48 @@ func NewTestServices[Result any](t *testing.T, c TestServicesConfig) TestService logger, ) - backend, err := requestreply.NewPubSubBackend[Result]( - requestreply.PubSubBackendConfig{ - Publisher: pubSub, - SubscriberConstructor: func(subscriberContext requestreply.PubSubBackendSubscribeParams) (message.Subscriber, error) { - assert.NotEmpty(t, subscriberContext.OperationID) - assert.NotEmpty(t, subscriberContext.Command) + backendConfig := requestreply.PubSubBackendConfig{ + Publisher: pubSub, + SubscriberConstructor: func(subscriberContext requestreply.PubSubBackendSubscribeParams) (message.Subscriber, error) { + assert.NotEmpty(t, subscriberContext.OperationID) + assert.NotEmpty(t, subscriberContext.Command) - return pubSub, nil - }, - GenerateSubscribeTopic: func(subscriberContext requestreply.PubSubBackendSubscribeParams) (string, error) { - assert.NotEmpty(t, subscriberContext.OperationID) - assert.NotEmpty(t, subscriberContext.Command) + return pubSub, nil + }, + GenerateSubscribeTopic: func(subscriberContext requestreply.PubSubBackendSubscribeParams) (string, error) { + assert.NotEmpty(t, subscriberContext.OperationID) + assert.NotEmpty(t, subscriberContext.Command) - return "reply", nil - }, - GeneratePublishTopic: func(subscriberContext requestreply.PubSubBackendPublishParams) (string, error) { - assert.NotEmpty(t, subscriberContext.OperationID) - assert.NotEmpty(t, subscriberContext.Command) - assert.NotEmpty(t, subscriberContext.CommandMessage) + return "reply", nil + }, + GeneratePublishTopic: func(subscriberContext requestreply.PubSubBackendPublishParams) (string, error) { + assert.NotEmpty(t, subscriberContext.OperationID) + assert.NotEmpty(t, subscriberContext.Command) + assert.NotEmpty(t, subscriberContext.CommandMessage) - return "reply", nil - }, - Logger: logger, - ModifyNotificationMessage: func(msg *message.Message, params requestreply.PubSubBackendOnCommandProcessedParams) error { - // to make it deterministic - msg.UUID = "1" - - assert.NotEmpty(t, params.OperationID) - assert.NotEmpty(t, params.Command) - assert.NotEmpty(t, params.CommandMessage) - - // to ensure backward compatibility - if c.AssertNotificationMessage != nil { - c.AssertNotificationMessage(t, msg) - } + return "reply", nil + }, + Logger: logger, + ModifyNotificationMessage: func(msg *message.Message, params requestreply.PubSubBackendOnCommandProcessedParams) error { + // to make it deterministic + msg.UUID = "1" - return nil - }, - AckCommandErrors: !c.DoNotAckOnCommandErrors, - ListenForReplyTimeout: c.ListenForReplyTimeout, + assert.NotEmpty(t, params.OperationID) + assert.NotEmpty(t, params.Command) + assert.NotEmpty(t, params.CommandMessage) + + // to ensure backward compatibility + if c.AssertNotificationMessage != nil { + c.AssertNotificationMessage(t, msg) + } + + return nil }, + AckCommandErrors: !c.DoNotAckOnCommandErrors, + ListenForReplyTimeout: c.ListenForReplyTimeout, + } + backend, err := requestreply.NewPubSubBackend[Result]( + backendConfig, requestreply.BackendPubsubJSONMarshaler[Result]{}, ) require.NoError(t, err) @@ -130,6 +132,7 @@ func NewTestServices[Result any](t *testing.T, c TestServicesConfig) TestService CommandBus: commandBus, CommandProcessor: commandProcessor, Marshaler: marshaler, + BackendConfig: backendConfig, } } @@ -733,3 +736,31 @@ func TestRequestReply_parallel_same_handler(t *testing.T) { close(start) wg.Wait() } + +func TestNewPubSubBackend_missing_values(t *testing.T) { + t.Run("invalid_config", func(t *testing.T) { + invalidConfig := requestreply.PubSubBackendConfig{} + require.Error(t, invalidConfig.Validate()) + + backend, err := requestreply.NewPubSubBackend[requestreply.NoResult]( + invalidConfig, + requestreply.BackendPubsubJSONMarshaler[requestreply.NoResult]{}, + ) + assert.Error(t, err) + assert.ErrorContains(t, err, "invalid config") + assert.Nil(t, backend) + }) + + t.Run("missing_marshaler", func(t *testing.T) { + ts := NewTestServices[struct{}](t, TestServicesConfig{}) + require.NoError(t, ts.BackendConfig.Validate()) + + backend, err := requestreply.NewPubSubBackend[requestreply.NoResult]( + ts.BackendConfig, + nil, + ) + assert.Error(t, err) + assert.ErrorContains(t, err, "marshaler cannot be nil") + assert.Nil(t, backend) + }) +}