Skip to content

Commit

Permalink
Addressed review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Nov 3, 2023
1 parent cf6e8f8 commit 1b8fad6
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ void processSqsMessages_with_acks_and_progress_check_callbacks(final int numberO
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
when(sqsOptions.getVisibilityTimeout()).thenReturn(Duration.ofSeconds(6));
when(sqsOptions.getMaxVisibilityTimeoutExtension()).thenReturn(Duration.ofSeconds(60));
when(sqsOptions.getExtendVisibilityTimeout()).thenReturn(true);
when(sqsOptions.getVisibilityDuplicateProtection()).thenReturn(true);
acknowledgementSetManager = new DefaultAcknowledgementSetManager(executor);
final SqsWorker objectUnderTest = createObjectUnderTest();
final int sqsMessagesProcessed = objectUnderTest.processSqsMessages();
Expand Down Expand Up @@ -420,7 +420,7 @@ void processSqsMessages_with_acks_and_progress_check_callbacks_expires(final int
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
when(sqsOptions.getVisibilityTimeout()).thenReturn(Duration.ofSeconds(6));
when(sqsOptions.getMaxVisibilityTimeoutExtension()).thenReturn(Duration.ofSeconds(60));
when(sqsOptions.getExtendVisibilityTimeout()).thenReturn(true);
when(sqsOptions.getVisibilityDuplicateProtection()).thenReturn(true);
acknowledgementSetManager = new DefaultAcknowledgementSetManager(executor);
final SqsWorker objectUnderTest = createObjectUnderTest();
final int sqsMessagesProcessed = objectUnderTest.processSqsMessages();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,23 +239,23 @@ && isEventBridgeEventTypeCreated(parsedMessage)) {
final int progressCheckInterval = visibilityTimeout/2 - 1;
if (endToEndAcknowledgementsEnabled) {
int expiryTimeout = visibilityTimeout - 2;
final boolean extendedVisibilityTimeoutEnabled = sqsOptions.getExtendVisibilityTimeout();
if (extendedVisibilityTimeoutEnabled) {
final boolean visibilityDuplicateProtectionEnabled = sqsOptions.getVisibilityDuplicateProtection();
if (visibilityDuplicateProtectionEnabled) {
expiryTimeout = maxVisibilityTimeout;
}
acknowledgementSet = acknowledgementSetManager.create(
(result) -> {
acknowledgementSetCallbackCounter.increment();
// Delete only if this is positive acknowledgement
if (extendedVisibilityTimeoutEnabled) {
if (visibilityDuplicateProtectionEnabled) {
parsedMessageVisibilityTimesMap.remove(parsedMessage);
}
if (result == true) {
deleteSqsMessages(waitingForAcknowledgements);
}
},
Duration.ofSeconds(expiryTimeout));
if (extendedVisibilityTimeoutEnabled) {
if (visibilityDuplicateProtectionEnabled) {
acknowledgementSet.addProgressCheck(
(ratio) -> {
final int newVisibilityTimeoutSeconds = visibilityTimeout;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

public class SqsOptions {
private static final int DEFAULT_MAXIMUM_MESSAGES = 10;
private static final Boolean DEFAULT_EXTEND_VISIBILITY_TIMEOUT = false;
private static final Boolean DEFAULT_VISIBILITY_DUPLICATE_PROTECTION = false;
private static final Duration DEFAULT_VISIBILITY_TIMEOUT_SECONDS = Duration.ofSeconds(30);
private static final Duration DEFAULT_MAX_VISIBILITY_TIMEOUT_EXTENSION = Duration.ofSeconds(1800); // 30 minutes
private static final Duration DEFAULT_WAIT_TIME_SECONDS = Duration.ofSeconds(20);
Expand All @@ -36,8 +36,8 @@ public class SqsOptions {
@DurationMax(seconds = 43200)
private Duration visibilityTimeout = DEFAULT_VISIBILITY_TIMEOUT_SECONDS;

@JsonProperty("extend_visibility_timeout")
private Boolean extendVisibilityTimeout = DEFAULT_EXTEND_VISIBILITY_TIMEOUT;
@JsonProperty("visibility_duplication_protection")
private Boolean visibilityDuplicateProtection = DEFAULT_VISIBILITY_DUPLICATE_PROTECTION;

@JsonProperty("max_visibility_timeout_extension")
@DurationMin(seconds = 30)
Expand Down Expand Up @@ -69,8 +69,8 @@ public Duration getMaxVisibilityTimeoutExtension() {
return maxVisibilityTimeoutExtension;
}

public Boolean getExtendVisibilityTimeout() {
return extendVisibilityTimeout;
public Boolean getVisibilityDuplicateProtection() {
return visibilityDuplicateProtection;
}

public Duration getWaitTime() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ void processSqsMessages_should_return_number_of_messages_processed_with_acknowle
@ParameterizedTest
@ValueSource(strings = {"ObjectCreated:Put", "ObjectCreated:Post", "ObjectCreated:Copy", "ObjectCreated:CompleteMultipartUpload"})
void processSqsMessages_should_return_number_of_messages_processed_with_acknowledgements_and_progress_check(final String eventName) throws IOException {
when(sqsOptions.getExtendVisibilityTimeout()).thenReturn(true);
when(sqsOptions.getVisibilityDuplicateProtection()).thenReturn(true);
when(sqsOptions.getVisibilityTimeout()).thenReturn(Duration.ofSeconds(6));
when(acknowledgementSetManager.create(any(), any(Duration.class))).thenReturn(acknowledgementSet);
when(s3SourceConfig.getAcknowledgements()).thenReturn(true);
Expand Down

0 comments on commit 1b8fad6

Please sign in to comment.