Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SD-103813 | implementation of the SQS receive message #28

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions inssqs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ if err != nil {
}
```

### Receiving Messages
Receive batch of messages from SQS queue using `ReceiveMessageBatch()`

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code example below would be nice like examples above

## Configuration Options
- Region: AWS region where the SQS queue resides.
- QueueName: Name of the SQS queue.
Expand Down
38 changes: 38 additions & 0 deletions inssqs/inssqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package inssqs

import (
"context"
"fmt"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/retry"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
Expand All @@ -18,6 +19,7 @@ import (
type Interface interface {
SendMessageBatch(entries []SQSMessageEntry) (failed []SQSMessageEntry, err error)
DeleteMessageBatch(entries []SQSDeleteMessageEntry) (failed []SQSDeleteMessageEntry, err error)
ReceiveMessageBatch(rmi ReceiveMessageInput) (ReceiveMessageOutput, error)
}

type queue struct {
Expand Down Expand Up @@ -117,6 +119,42 @@ func (c *Config) setDefaults() {
}
}

// ReceiveMessageBatch receives a batch of messages from an SQS queue
//
// Parameters:
// - rmi: ReceiveMessageInput containing the parameters for receiving messages from the SQS queue.
//
// Returns:
// - res: A pointer to ReceiveMessageOutput containing the received messages.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't return the pointer

// - err: An error indicating any failure during the receiving process, nil if all messages were received successfully.
func (q *queue) ReceiveMessageBatch(rmi ReceiveMessageInput) (ReceiveMessageOutput, error) {
if rmi.MaxNumberOfMessages > 10 {
return ReceiveMessageOutput{}, fmt.Errorf("maxMessages should be less than or equal to 10")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fmt.Errorf is redundant here, formating isn't used. we can use errors.New in inssqs/errors.go

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return ReceiveMessageOutput{}, fmt.Errorf("maxMessages should be less than or equal to 10")
return ReceiveMessageOutput{}, fmt.Errorf("MaxNumberOfMessages should be less than or equal to 10")

}

if rmi.MaxNumberOfMessages < 1 {
return ReceiveMessageOutput{}, fmt.Errorf("maxMessages should be greater than 0")
}

input := &awssqs.ReceiveMessageInput{
QueueUrl: q.url,
MaxNumberOfMessages: rmi.MaxNumberOfMessages,
VisibilityTimeout: rmi.VisibilityTimeout,
AttributeNames: rmi.AttributeNames,
MessageAttributeNames: rmi.MessageAttributeNames,
ReceiveRequestAttemptId: rmi.ReceiveRequestAttemptId,
WaitTimeSeconds: rmi.WaitTimeSeconds,
}

res, err := q.client.ReceiveMessage(context.Background(), input)

if err != nil {
Comment on lines +149 to +151
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
res, err := q.client.ReceiveMessage(context.Background(), input)
if err != nil {
res, err := q.client.ReceiveMessage(context.Background(), input)
if err != nil {

in styling-wise, we prefer handling error directly with no new line between

return ReceiveMessageOutput{}, err
}

return ReceiveMessageOutput{Messages: res.Messages}, nil
}

// SendMessageBatch sends a batch of messages to an SQS queue, handling retries and respecting batch size constraints.
//
// Parameters:
Expand Down
14 changes: 14 additions & 0 deletions inssqs/inssqs_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

128 changes: 127 additions & 1 deletion inssqs/model.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package inssqs

import "github.com/aws/aws-sdk-go-v2/service/sqs/types"
import (
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/aws/smithy-go/middleware"
)

type entry interface {
getId() *string
Expand Down Expand Up @@ -43,3 +46,126 @@ func (e SQSDeleteMessageEntry) toDeleteMessageBatchRequestEntry() types.DeleteMe
ReceiptHandle: e.ReceiptHandle,
}
}

type ReceiveMessageOutput struct {
// A list of messages.
Messages []types.Message

// Metadata pertaining to the operation's result.
ResultMetadata middleware.Metadata
}

type ReceiveMessageInput struct {

// The URL of the Amazon SQS queue from which messages are received. Queue URLs
// and names are case-sensitive.
//
// This member is required.
QueueUrl *string

// A list of attributes that need to be returned along with each message. These
// attributes include:
// - All – Returns all values.
// - ApproximateFirstReceiveTimestamp – Returns the time the message was first
// received from the queue ( epoch time (http://en.wikipedia.org/wiki/Unix_time)
// in milliseconds).
// - ApproximateReceiveCount – Returns the number of times a message has been
// received across all queues but not deleted.
// - AWSTraceHeader – Returns the X-Ray trace header string.
// - SenderId
// - For a user, returns the user ID, for example ABCDEFGHI1JKLMNOPQ23R .
// - For an IAM role, returns the IAM role ID, for example
// ABCDE1F2GH3I4JK5LMNOP:i-a123b456 .
// - SentTimestamp – Returns the time the message was sent to the queue ( epoch
// time (http://en.wikipedia.org/wiki/Unix_time) in milliseconds).
// - SqsManagedSseEnabled – Enables server-side queue encryption using SQS owned
// encryption keys. Only one server-side encryption option is supported per queue
// (for example, SSE-KMS (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-configure-sse-existing-queue.html)
// or SSE-SQS (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-configure-sqs-sse-queue.html)
// ).
// - MessageDeduplicationId – Returns the value provided by the producer that
// calls the SendMessage action.
// - MessageGroupId – Returns the value provided by the producer that calls the
// SendMessage action. Messages with the same MessageGroupId are returned in
// sequence.
// - SequenceNumber – Returns the value provided by Amazon SQS.
AttributeNames []types.QueueAttributeName

// The maximum number of messages to return. Amazon SQS never returns more
// messages than this value (however, fewer messages might be returned). Valid
// values: 1 to 10. Default: 1.
MaxNumberOfMessages int32

// The name of the message attribute, where N is the index.
// - The name can contain alphanumeric characters and the underscore ( _ ),
// hyphen ( - ), and period ( . ).
// - The name is case-sensitive and must be unique among all attribute names for
// the message.
// - The name must not start with AWS-reserved prefixes such as AWS. or Amazon.
// (or any casing variants).
// - The name must not start or end with a period ( . ), and it should not have
// periods in succession ( .. ).
// - The name can be up to 256 characters long.
// When using ReceiveMessage , you can send a list of attribute names to receive,
// or you can return all of the attributes by specifying All or .* in your
// request. You can also use all message attributes starting with a prefix, for
// example bar.* .
MessageAttributeNames []string

// This parameter applies only to FIFO (first-in-first-out) queues. The token used
// for deduplication of ReceiveMessage calls. If a networking issue occurs after a
// ReceiveMessage action, and instead of a response you receive a generic error, it
// is possible to retry the same action with an identical ReceiveRequestAttemptId
// to retrieve the same set of messages, even if their visibility timeout has not
// yet expired.
// - You can use ReceiveRequestAttemptId only for 5 minutes after a
// ReceiveMessage action.
// - When you set FifoQueue , a caller of the ReceiveMessage action can provide a
// ReceiveRequestAttemptId explicitly.
// - If a caller of the ReceiveMessage action doesn't provide a
// ReceiveRequestAttemptId , Amazon SQS generates a ReceiveRequestAttemptId .
// - It is possible to retry the ReceiveMessage action with the same
// ReceiveRequestAttemptId if none of the messages have been modified (deleted or
// had their visibility changes).
// - During a visibility timeout, subsequent calls with the same
// ReceiveRequestAttemptId return the same messages and receipt handles. If a
// retry occurs within the deduplication interval, it resets the visibility
// timeout. For more information, see Visibility Timeout (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html)
// in the Amazon SQS Developer Guide. If a caller of the ReceiveMessage action
// still processes messages when the visibility timeout expires and messages become
// visible, another worker consuming from the same queue can receive the same
// messages and therefore process duplicates. Also, if a consumer whose message
// processing time is longer than the visibility timeout tries to delete the
// processed messages, the action fails with an error. To mitigate this effect,
// ensure that your application observes a safe threshold before the visibility
// timeout expires and extend the visibility timeout as necessary.
// - While messages with a particular MessageGroupId are invisible, no more
// messages belonging to the same MessageGroupId are returned until the
// visibility timeout expires. You can still receive messages with another
// MessageGroupId as long as it is also visible.
// - If a caller of ReceiveMessage can't track the ReceiveRequestAttemptId , no
// retries work until the original visibility timeout expires. As a result, delays
// might occur but the messages in the queue remain in a strict order.
// The maximum length of ReceiveRequestAttemptId is 128 characters.
// ReceiveRequestAttemptId can contain alphanumeric characters ( a-z , A-Z , 0-9 )
// and punctuation ( !"#$%&'()*+,-./:;<=>?@[\]^_`{|}~ ). For best practices of
// using ReceiveRequestAttemptId , see Using the ReceiveRequestAttemptId Request
// Parameter (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-receiverequestattemptid-request-parameter.html)
// in the Amazon SQS Developer Guide.
ReceiveRequestAttemptId *string

// The duration (in seconds) that the received messages are hidden from subsequent
// retrieve requests after being retrieved by a ReceiveMessage request.
VisibilityTimeout int32

// The duration (in seconds) for which the call waits for a message to arrive in
// the queue before returning. If a message is available, the call returns sooner
// than WaitTimeSeconds . If no messages are available and the wait time expires,
// the call returns successfully with an empty list of messages. To avoid HTTP
// errors, ensure that the HTTP response timeout for ReceiveMessage requests is
// longer than the WaitTimeSeconds parameter. For example, with the Java SDK, you
// can set HTTP transport settings using the NettyNioAsyncHttpClient (https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.html)
// for asynchronous clients, or the ApacheHttpClient (https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.html)
// for synchronous clients.
WaitTimeSeconds int32
}
5 changes: 5 additions & 0 deletions inssqs/sqs/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type API interface {
SendMessageBatch(ctx context.Context, params *sqs.SendMessageBatchInput, optFns ...func(*sqs.Options)) (*sqs.SendMessageBatchOutput, error)
GetQueueUrl(ctx context.Context, params *sqs.GetQueueUrlInput, optFns ...func(*sqs.Options)) (*sqs.GetQueueUrlOutput, error)
DeleteMessageBatch(ctx context.Context, params *sqs.DeleteMessageBatchInput, optFns ...func(*sqs.Options)) (*sqs.DeleteMessageBatchOutput, error)
ReceiveMessage(ctx context.Context, params *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error)
}

type proxy struct {
Expand All @@ -30,3 +31,7 @@ func (p *proxy) GetQueueUrl(ctx context.Context, params *sqs.GetQueueUrlInput, o
func (p *proxy) DeleteMessageBatch(ctx context.Context, params *sqs.DeleteMessageBatchInput, optFns ...func(*sqs.Options)) (*sqs.DeleteMessageBatchOutput, error) {
return p.client.DeleteMessageBatch(ctx, params, optFns...)
}

func (p *proxy) ReceiveMessage(ctx context.Context, params *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) {
return p.client.ReceiveMessage(ctx, params, optFns...)
}
20 changes: 20 additions & 0 deletions inssqs/sqs/sqs_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,26 @@ type MockAPI struct {
recorder *MockAPIMockRecorder
}

// ReceiveMessage mocks base method.
func (m *MockAPI) ReceiveMessage(ctx context.Context, params *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) {
m.ctrl.T.Helper()
varargs := []interface{}{ctx, params}
for _, a := range optFns {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "ReceiveMessage", varargs...)
ret0, _ := ret[0].(*sqs.ReceiveMessageOutput)
ret1, _ := ret[1].(error)
return ret0, ret1
}

// ReceiveMessage indicates an expected call of ReceiveMessage.
func (mr *MockAPIMockRecorder) ReceiveMessage(ctx, params interface{}, optFns ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{ctx, params}, optFns...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReceiveMessage", reflect.TypeOf((*MockAPI)(nil).ReceiveMessage), varargs...)
}

// MockAPIMockRecorder is the mock recorder for MockAPI.
type MockAPIMockRecorder struct {
mock *MockAPI
Expand Down
Loading