Skip to content

Commit

Permalink
check codeconv on PR (#398)
Browse files Browse the repository at this point in the history
* check codeconv on PR

* added extra tests for requestreply
  • Loading branch information
roblaszczak authored Sep 24, 2023
1 parent be7b7db commit 8786614
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 42 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ on:
pull_request:
jobs:
ci:
uses: ThreeDotsLabs/watermill/.github/workflows/tests.yml@lint
uses: ThreeDotsLabs/watermill/.github/workflows/tests.yml@codeconv-pr
9 changes: 9 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,12 @@ jobs:
- run: make test_stress
if: ${{ inputs.stress-tests }}

codecov:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@master
- uses: actions/setup-go@v2
with:
go-version: '^1.21.1'
- run: make test_codecov
- uses: codecov/codecov-action@v3
13 changes: 8 additions & 5 deletions components/requestreply/backend_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/hashicorp/go-multierror"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -97,20 +98,22 @@ func (p *PubSubBackendConfig) setDefaults() {
}

func (p *PubSubBackendConfig) Validate() error {
var err error

if p.Publisher == nil {
return errors.New("publisher cannot be nil")
err = multierror.Append(err, errors.New("publisher cannot be nil"))
}
if p.SubscriberConstructor == nil {
return errors.New("subscriber constructor cannot be nil")
err = multierror.Append(err, errors.New("subscriber constructor cannot be nil"))
}
if p.GeneratePublishTopic == nil {
return errors.New("GeneratePublishTopic cannot be nil")
err = multierror.Append(err, errors.New("GeneratePublishTopic cannot be nil"))
}
if p.GenerateSubscribeTopic == nil {
return errors.New("GenerateSubscribeTopic cannot be nil")
err = multierror.Append(err, errors.New("GenerateSubscribeTopic cannot be nil"))
}

return nil
return err
}

func (p PubSubBackend[Result]) ListenForNotifications(
Expand Down
103 changes: 67 additions & 36 deletions components/requestreply/requestreply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type TestServices[Result any] struct {
CommandProcessor *cqrs.CommandProcessor

RequestReplyBackend *requestreply.PubSubBackend[Result]
BackendConfig requestreply.PubSubBackendConfig
}

type TestServicesConfig struct {
Expand All @@ -50,47 +51,48 @@ func NewTestServices[Result any](t *testing.T, c TestServicesConfig) TestService
logger,
)

backend, err := requestreply.NewPubSubBackend[Result](
requestreply.PubSubBackendConfig{
Publisher: pubSub,
SubscriberConstructor: func(subscriberContext requestreply.PubSubBackendSubscribeParams) (message.Subscriber, error) {
assert.NotEmpty(t, subscriberContext.OperationID)
assert.NotEmpty(t, subscriberContext.Command)
backendConfig := requestreply.PubSubBackendConfig{
Publisher: pubSub,
SubscriberConstructor: func(subscriberContext requestreply.PubSubBackendSubscribeParams) (message.Subscriber, error) {
assert.NotEmpty(t, subscriberContext.OperationID)
assert.NotEmpty(t, subscriberContext.Command)

return pubSub, nil
},
GenerateSubscribeTopic: func(subscriberContext requestreply.PubSubBackendSubscribeParams) (string, error) {
assert.NotEmpty(t, subscriberContext.OperationID)
assert.NotEmpty(t, subscriberContext.Command)
return pubSub, nil
},
GenerateSubscribeTopic: func(subscriberContext requestreply.PubSubBackendSubscribeParams) (string, error) {
assert.NotEmpty(t, subscriberContext.OperationID)
assert.NotEmpty(t, subscriberContext.Command)

return "reply", nil
},
GeneratePublishTopic: func(subscriberContext requestreply.PubSubBackendPublishParams) (string, error) {
assert.NotEmpty(t, subscriberContext.OperationID)
assert.NotEmpty(t, subscriberContext.Command)
assert.NotEmpty(t, subscriberContext.CommandMessage)
return "reply", nil
},
GeneratePublishTopic: func(subscriberContext requestreply.PubSubBackendPublishParams) (string, error) {
assert.NotEmpty(t, subscriberContext.OperationID)
assert.NotEmpty(t, subscriberContext.Command)
assert.NotEmpty(t, subscriberContext.CommandMessage)

return "reply", nil
},
Logger: logger,
ModifyNotificationMessage: func(msg *message.Message, params requestreply.PubSubBackendOnCommandProcessedParams) error {
// to make it deterministic
msg.UUID = "1"

assert.NotEmpty(t, params.OperationID)
assert.NotEmpty(t, params.Command)
assert.NotEmpty(t, params.CommandMessage)

// to ensure backward compatibility
if c.AssertNotificationMessage != nil {
c.AssertNotificationMessage(t, msg)
}
return "reply", nil
},
Logger: logger,
ModifyNotificationMessage: func(msg *message.Message, params requestreply.PubSubBackendOnCommandProcessedParams) error {
// to make it deterministic
msg.UUID = "1"

return nil
},
AckCommandErrors: !c.DoNotAckOnCommandErrors,
ListenForReplyTimeout: c.ListenForReplyTimeout,
assert.NotEmpty(t, params.OperationID)
assert.NotEmpty(t, params.Command)
assert.NotEmpty(t, params.CommandMessage)

// to ensure backward compatibility
if c.AssertNotificationMessage != nil {
c.AssertNotificationMessage(t, msg)
}

return nil
},
AckCommandErrors: !c.DoNotAckOnCommandErrors,
ListenForReplyTimeout: c.ListenForReplyTimeout,
}
backend, err := requestreply.NewPubSubBackend[Result](
backendConfig,
requestreply.BackendPubsubJSONMarshaler[Result]{},
)
require.NoError(t, err)
Expand Down Expand Up @@ -130,6 +132,7 @@ func NewTestServices[Result any](t *testing.T, c TestServicesConfig) TestService
CommandBus: commandBus,
CommandProcessor: commandProcessor,
Marshaler: marshaler,
BackendConfig: backendConfig,
}
}

Expand Down Expand Up @@ -733,3 +736,31 @@ func TestRequestReply_parallel_same_handler(t *testing.T) {
close(start)
wg.Wait()
}

func TestNewPubSubBackend_missing_values(t *testing.T) {
t.Run("invalid_config", func(t *testing.T) {
invalidConfig := requestreply.PubSubBackendConfig{}
require.Error(t, invalidConfig.Validate())

backend, err := requestreply.NewPubSubBackend[requestreply.NoResult](
invalidConfig,
requestreply.BackendPubsubJSONMarshaler[requestreply.NoResult]{},
)
assert.Error(t, err)
assert.ErrorContains(t, err, "invalid config")
assert.Nil(t, backend)
})

t.Run("missing_marshaler", func(t *testing.T) {
ts := NewTestServices[struct{}](t, TestServicesConfig{})
require.NoError(t, ts.BackendConfig.Validate())

backend, err := requestreply.NewPubSubBackend[requestreply.NoResult](
ts.BackendConfig,
nil,
)
assert.Error(t, err)
assert.ErrorContains(t, err, "marshaler cannot be nil")
assert.Nil(t, backend)
})
}

0 comments on commit 8786614

Please sign in to comment.