Skip to content

Commit

Permalink
Fix: Propagate AWSTraceHeader (#1287)
Browse files Browse the repository at this point in the history
Fixes #1262
  • Loading branch information
acomofcg authored Dec 11, 2024
1 parent a08ebc1 commit a9f51fc
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ private Integer getDelaySeconds(Message message) {
private Map<MessageSystemAttributeNameForSends, MessageSystemAttributeValue> mapMessageSystemAttributes(
Message message) {
return message.attributes().entrySet().stream().filter(Predicate.not(entry -> isSkipAttribute(entry.getKey())))
.collect(Collectors.toMap(entry -> MessageSystemAttributeNameForSends.fromValue(entry.getKey().name()),
.collect(Collectors.toMap(entry -> MessageSystemAttributeNameForSends.fromValue(entry.getKey().toString()),
entry -> MessageSystemAttributeValue.builder().dataType(MessageAttributeDataTypes.STRING)
.stringValue(entry.getValue()).build()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ public Message fromHeaders(MessageHeaders headers) {
attributes.put(MessageSystemAttributeName.MESSAGE_DEDUPLICATION_ID,
headers.get(SqsHeaders.MessageSystemAttributes.SQS_MESSAGE_DEDUPLICATION_ID_HEADER, String.class));
}
if (headers.containsKey(SqsHeaders.MessageSystemAttributes.SQS_AWS_TRACE_HEADER)) {
attributes.put(MessageSystemAttributeName.AWS_TRACE_HEADER,
headers.get(SqsHeaders.MessageSystemAttributes.SQS_AWS_TRACE_HEADER, String.class));
}
Map<String, MessageAttributeValue> messageAttributes = headers.entrySet().stream()
.filter(entry -> !isSkipHeader(entry.getKey())).collect(Collectors.toMap(Map.Entry::getKey,
entry -> getMessageAttributeValue(entry.getKey(), entry.getValue())));
Expand Down Expand Up @@ -110,6 +114,7 @@ else if (messageHeaderValue instanceof ByteBuffer) {
private boolean isSkipHeader(String headerName) {
return SqsHeaders.MessageSystemAttributes.SQS_MESSAGE_GROUP_ID_HEADER.equals(headerName)
|| SqsHeaders.MessageSystemAttributes.SQS_MESSAGE_DEDUPLICATION_ID_HEADER.equals(headerName)
|| SqsHeaders.MessageSystemAttributes.SQS_AWS_TRACE_HEADER.equals(headerName)
|| SqsHeaders.SQS_DELAY_HEADER.equals(headerName) || MessageHeaders.ID.equals(headerName)
|| MessageHeaders.TIMESTAMP.equals(headerName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.*;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.awspring.cloud.sqs.QueueAttributesResolvingException;
Expand All @@ -47,23 +46,7 @@
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.CreateQueueResponse;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse;
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesResponse;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse;
import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageResponse;
import software.amazon.awssdk.services.sqs.model.*;

/**
* @author Tomaz Fernandes
Expand Down Expand Up @@ -1208,4 +1191,35 @@ void shouldReceiveBatchFifo() {

}

@Test
void shouldPropagateTracingAsMessageSystemAttribute() {
String queue = "test-queue";
GetQueueUrlResponse urlResponse = GetQueueUrlResponse.builder().queueUrl(queue).build();
given(mockClient.getQueueUrl(any(GetQueueUrlRequest.class)))
.willReturn(CompletableFuture.completedFuture(urlResponse));
mockQueueAttributes(mockClient, Map.of());
SendMessageResponse response = SendMessageResponse.builder().messageId(UUID.randomUUID().toString())
.sequenceNumber("123").build();
given(mockClient.sendMessage(any(SendMessageRequest.class)))
.willReturn(CompletableFuture.completedFuture(response));

SqsOperations sqsOperations = SqsTemplate.newSyncTemplate(mockClient);
SendResult<Object> result = sqsOperations.send(options -> options
.queue(queue)
.header(SqsHeaders.MessageSystemAttributes.SQS_AWS_TRACE_HEADER, "abc")
.payload("test")
);

assertThat(result).isNotNull();

ArgumentCaptor<SendMessageRequest> captor = ArgumentCaptor.forClass(SendMessageRequest.class);
then(mockClient).should().sendMessage(captor.capture());
SendMessageRequest sendMessageRequest = captor.getValue();

assertThat(sendMessageRequest.messageSystemAttributes()).hasEntrySatisfying(
MessageSystemAttributeNameForSends.AWS_TRACE_HEADER,
value -> assertThat(value.stringValue()).isEqualTo("abc")
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.Map;
import java.util.UUID;
import java.util.stream.Stream;

import io.awspring.cloud.sqs.listener.SqsHeaders;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
Expand All @@ -29,6 +31,7 @@
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;

/**
* Tests for {@link SqsHeaderMapper}.
Expand Down Expand Up @@ -139,6 +142,36 @@ void shouldAddNumberMessageAttributes() {
assertThat(headers.get(headerName)).isEqualTo(headerValue);
}

@Test
void shouldCreateMessageWithSystemAttributesFromHeaders() {
MessageHeaders headers = new MessageHeaders(
Map.of(
SqsHeaders.MessageSystemAttributes.SQS_MESSAGE_GROUP_ID_HEADER, "value1",
SqsHeaders.MessageSystemAttributes.SQS_MESSAGE_DEDUPLICATION_ID_HEADER, "value2",
SqsHeaders.MessageSystemAttributes.SQS_AWS_TRACE_HEADER, "value3",
"customHeaderString", "customValueString",
"customHeaderNumber", 42
)
);

SqsHeaderMapper mapper = new SqsHeaderMapper();
Message message = mapper.fromHeaders(headers);

assertThat(message.attributes())
.hasSize(3)
.containsExactlyInAnyOrderEntriesOf(Map.of(
MessageSystemAttributeName.MESSAGE_GROUP_ID, "value1",
MessageSystemAttributeName.MESSAGE_DEDUPLICATION_ID, "value2",
MessageSystemAttributeName.AWS_TRACE_HEADER, "value3"
));
assertThat(message.messageAttributes())
.hasSize(2)
.containsExactlyInAnyOrderEntriesOf(Map.of(
"customHeaderString", MessageAttributeValue.builder().dataType(MessageAttributeDataTypes.STRING).stringValue("customValueString").build(),
"customHeaderNumber", MessageAttributeValue.builder().dataType(MessageAttributeDataTypes.NUMBER + ".java.lang.Integer").stringValue("42").build()
));
}

@ParameterizedTest
@MethodSource("validArguments")
void createsMessageWithNumberHeader(String value, String type, Number expected) {
Expand Down

0 comments on commit a9f51fc

Please sign in to comment.