diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractPipelineMessageListenerContainer.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractPipelineMessageListenerContainer.java index e11a92bdb..401f07248 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractPipelineMessageListenerContainer.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractPipelineMessageListenerContainer.java @@ -35,6 +35,7 @@ import io.awspring.cloud.sqs.listener.source.AcknowledgementProcessingMessageSource; import io.awspring.cloud.sqs.listener.source.MessageSource; import io.awspring.cloud.sqs.listener.source.PollingMessageSource; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -226,17 +227,17 @@ private TaskExecutor validateCustomExecutor(TaskExecutor taskExecutor) { protected BackPressureHandler createBackPressureHandler() { O containerOptions = getContainerOptions(); - BatchAwareBackPressureHandler backPressureHandler = SemaphoreBackPressureHandler.builder() - .batchSize(containerOptions.getMaxMessagesPerPoll()) - .totalPermits(containerOptions.getMaxConcurrentMessages()) - .acquireTimeout(containerOptions.getMaxDelayBetweenPolls()) - .throughputConfiguration(containerOptions.getBackPressureMode()).build(); + List backPressureHandlers = new ArrayList<>(2); + Duration acquireTimeout = containerOptions.getMaxDelayBetweenPolls(); + int batchSize = containerOptions.getMaxMessagesPerPoll(); + backPressureHandlers.add(SemaphoreBackPressureHandler.builder().batchSize(batchSize) + .totalPermits(containerOptions.getMaxConcurrentMessages()).acquireTimeout(acquireTimeout) + .throughputConfiguration(containerOptions.getBackPressureMode()).build()); if (containerOptions.getBackPressureLimiter() != null) { - backPressureHandler = new BackPressureHandlerLimiter(backPressureHandler, - containerOptions.getBackPressureLimiter(), containerOptions.getStandbyLimitPollingInterval(), - containerOptions.getMaxDelayBetweenPolls()); + backPressureHandlers.add(new BackPressureHandlerLimiter(containerOptions.getBackPressureLimiter(), + acquireTimeout, containerOptions.getStandbyLimitPollingInterval(), batchSize)); } - return backPressureHandler; + return new CompositeBackPressureHandler(backPressureHandlers, batchSize); } protected TaskExecutor createSourcesTaskExecutor() { diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandler.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandler.java index 1d76d6589..f2ff274b1 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandler.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandler.java @@ -29,7 +29,7 @@ public interface BackPressureHandler { /** - * Request a number of permits. Each obtained permit allows the + * Requests a number of permits. Each obtained permit allows the * {@link io.awspring.cloud.sqs.listener.source.MessageSource} to retrieve one message. * @param amount the amount of permits to request. * @return the amount of permits obtained. @@ -38,11 +38,24 @@ public interface BackPressureHandler { int request(int amount) throws InterruptedException; /** - * Release the specified amount of permits. Each message that has been processed should release one permit, whether - * processing was successful or not. + * Releases the specified amount of permits for processed messages. Each message that has been processed should + * release one permit, whether processing was successful or not. + *

+ * This method can is called in the following use cases: + *

* @param amount the amount of permits to release. + * @param reason the reason why the permits were released. */ - void release(int amount); + void release(int amount, ReleaseReason reason); /** * Attempts to acquire all permits up to the specified timeout. If successful, means all permits were returned and @@ -52,4 +65,24 @@ public interface BackPressureHandler { */ boolean drain(Duration timeout); + enum ReleaseReason { + /** + * Permits were not used because another BackPressureHandler has a lower permits limit and the difference need + * to be aligned across all handlers. + */ + LIMITED, + /** + * No messages were retrieved from SQS, so all permits need to be returned. + */ + NONE_FETCHED, + /** + * Some messages were fetched from SQS. Unused permits need to be returned. + */ + PARTIAL_FETCH, + /** + * The processing of one or more messages finished, successfully or not. + */ + PROCESSED; + } + } diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandlerLimiter.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandlerLimiter.java index aeb5a61cb..cd031a129 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandlerLimiter.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandlerLimiter.java @@ -27,12 +27,7 @@ * * @see BackPressureLimiter */ -public class BackPressureHandlerLimiter implements BatchAwareBackPressureHandler { - - /** - * The {@link BatchAwareBackPressureHandler} which permits should be limited by the {@link #backPressureLimiter}. - */ - private final BatchAwareBackPressureHandler backPressureHandler; +public class BackPressureHandlerLimiter implements BatchAwareBackPressureHandler, IdentifiableContainerComponent { /** * The {@link BackPressureLimiter} which computes a limit on how many permits can be requested at a given moment. @@ -59,50 +54,54 @@ public class BackPressureHandlerLimiter implements BatchAwareBackPressureHandler private final ReducibleSemaphore semaphore = new ReducibleSemaphore(0); - public BackPressureHandlerLimiter(BatchAwareBackPressureHandler backPressureHandler, - BackPressureLimiter backPressureLimiter, Duration standbyLimitPollingInterval, Duration acquireTimeout) { - this.backPressureHandler = backPressureHandler; + private final int batchSize; + + private String id; + + public BackPressureHandlerLimiter(BackPressureLimiter backPressureLimiter, Duration acquireTimeout, + Duration standbyLimitPollingInterval, int batchSize) { this.backPressureLimiter = backPressureLimiter; this.acquireTimeout = acquireTimeout; this.standbyLimitPollingInterval = standbyLimitPollingInterval; + this.batchSize = batchSize; } @Override - public int requestBatch() throws InterruptedException { - int permits = updatePermitsLimit(); - int batchSize = getBatchSize(); - if (permits < batchSize) { - return acquirePermits(permits, backPressureHandler::request); - } - return acquirePermits(batchSize, p -> backPressureHandler.requestBatch()); + public void setId(String id) { + this.id = id; } @Override - public void releaseBatch() { - semaphore.release(getBatchSize()); - backPressureHandler.releaseBatch(); + public String getId() { + return id; } @Override - public int getBatchSize() { - return backPressureHandler.getBatchSize(); + public int requestBatch() throws InterruptedException { + return request(batchSize); } @Override public int request(int amount) throws InterruptedException { int permits = Math.min(updatePermitsLimit(), amount); - return acquirePermits(permits, backPressureHandler::request); + if (permits == 0) { + Thread.sleep(standbyLimitPollingInterval.toMillis()); + return 0; + } + if (semaphore.tryAcquire(permits, acquireTimeout.toMillis(), TimeUnit.MILLISECONDS)) { + return permits; + } + return 0; } @Override - public void release(int amount) { + public void release(int amount, ReleaseReason reason) { semaphore.release(amount); - backPressureHandler.release(amount); } @Override public boolean drain(Duration timeout) { - return backPressureHandler.drain(timeout); + return true; } private int updatePermitsLimit() { @@ -120,25 +119,6 @@ else if (newLimit > oldLimit) { }); } - private interface PermitsRequester { - int request(int amount) throws InterruptedException; - } - - private int acquirePermits(int amount, PermitsRequester permitsRequester) throws InterruptedException { - if (amount == 0) { - Thread.sleep(standbyLimitPollingInterval.toMillis()); - return 0; - } - if (semaphore.tryAcquire(amount, acquireTimeout.toMillis(), TimeUnit.MILLISECONDS)) { - int obtained = permitsRequester.request(amount); - if (obtained < amount) { - semaphore.release(amount - obtained); - } - return obtained; - } - return 0; - } - private static class ReducibleSemaphore extends Semaphore { ReducibleSemaphore(int permits) { diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BatchAwareBackPressureHandler.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BatchAwareBackPressureHandler.java index 51e12e0a0..c9ce20f9b 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BatchAwareBackPressureHandler.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BatchAwareBackPressureHandler.java @@ -30,18 +30,4 @@ public interface BatchAwareBackPressureHandler extends BackPressureHandler { * @throws InterruptedException if the Thread is interrupted while waiting for permits. */ int requestBatch() throws InterruptedException; - - /** - * Release a batch of permits. This has the semantics of letting the {@link BackPressureHandler} know that all - * permits from a batch are being released, in opposition to {@link #release(int)} in which any number of permits - * can be specified. - */ - void releaseBatch(); - - /** - * Return the configured batch size for this handler. - * @return the batch size. - */ - int getBatchSize(); - } diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/CompositeBackPressureHandler.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/CompositeBackPressureHandler.java new file mode 100644 index 000000000..42202438b --- /dev/null +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/CompositeBackPressureHandler.java @@ -0,0 +1,84 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.sqs.listener; + +import java.time.Duration; +import java.util.List; + +public class CompositeBackPressureHandler implements BatchAwareBackPressureHandler, IdentifiableContainerComponent { + + private final List backPressureHandlers; + + private final int batchSize; + + private String id; + + public CompositeBackPressureHandler(List backPressureHandlers, int batchSize) { + this.backPressureHandlers = backPressureHandlers; + this.batchSize = batchSize; + } + + @Override + public void setId(String id) { + this.id = id; + backPressureHandlers.stream().filter(IdentifiableContainerComponent.class::isInstance) + .map(IdentifiableContainerComponent.class::cast) + .forEach(bph -> bph.setId(bph.getClass().getSimpleName() + "-" + id)); + } + + @Override + public String getId() { + return id; + } + + @Override + public int requestBatch() throws InterruptedException { + return request(batchSize); + } + + @Override + public int request(int amount) throws InterruptedException { + int obtained = amount; + int[] obtainedPerBph = new int[backPressureHandlers.size()]; + for (int i = 0; i < backPressureHandlers.size() && obtained > 0; i++) { + obtainedPerBph[i] = backPressureHandlers.get(i).request(obtained); + obtained = Math.min(obtained, obtainedPerBph[i]); + } + for (int i = 0; i < backPressureHandlers.size(); i++) { + int obtainedForBph = obtainedPerBph[i]; + if (obtainedForBph > obtained) { + backPressureHandlers.get(i).release(obtainedForBph - obtained, ReleaseReason.LIMITED); + } + } + return obtained; + } + + @Override + public void release(int amount, ReleaseReason reason) { + for (BackPressureHandler handler : backPressureHandlers) { + handler.release(amount, reason); + } + } + + @Override + public boolean drain(Duration timeout) { + boolean result = true; + for (BackPressureHandler handler : backPressureHandlers) { + result &= !handler.drain(timeout); + } + return result; + } +} diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SemaphoreBackPressureHandler.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SemaphoreBackPressureHandler.java index 310b64519..70ed3f306 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SemaphoreBackPressureHandler.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SemaphoreBackPressureHandler.java @@ -19,7 +19,7 @@ import java.util.Arrays; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.Assert; @@ -47,7 +47,7 @@ public class SemaphoreBackPressureHandler implements BatchAwareBackPressureHandl private volatile CurrentThroughputMode currentThroughputMode; - private final AtomicBoolean hasAcquiredFullPermits = new AtomicBoolean(false); + private final AtomicInteger lowThroughputPermitsAcquired = new AtomicInteger(0); private String id; @@ -79,31 +79,31 @@ public String getId() { } @Override - public int request(int amount) throws InterruptedException { - return tryAcquire(amount, this.currentThroughputMode) ? amount : 0; + public int requestBatch() throws InterruptedException { + return request(batchSize); } // @formatter:off @Override - public int requestBatch() throws InterruptedException { + public int request(int amount) throws InterruptedException { return CurrentThroughputMode.LOW.equals(this.currentThroughputMode) - ? requestInLowThroughputMode() - : requestInHighThroughputMode(); + ? requestInLowThroughputMode(amount) + : requestInHighThroughputMode(amount); } - private int requestInHighThroughputMode() throws InterruptedException { - return tryAcquire(this.batchSize, CurrentThroughputMode.HIGH) - ? this.batchSize - : tryAcquirePartial(); + private int requestInHighThroughputMode(int amount) throws InterruptedException { + return tryAcquire(amount, CurrentThroughputMode.HIGH) + ? amount + : tryAcquirePartial(amount); } // @formatter:on - private int tryAcquirePartial() throws InterruptedException { + private int tryAcquirePartial(int max) throws InterruptedException { int availablePermits = this.semaphore.availablePermits(); if (availablePermits == 0 || BackPressureMode.ALWAYS_POLL_MAX_MESSAGES.equals(this.backPressureConfiguration)) { return 0; } - int permitsToRequest = Math.min(availablePermits, this.batchSize); + int permitsToRequest = Math.min(availablePermits, max); CurrentThroughputMode currentThroughputModeNow = this.currentThroughputMode; logger.trace("Trying to acquire partial batch of {} permits from {} available for {} in TM {}", permitsToRequest, availablePermits, this.id, currentThroughputModeNow); @@ -111,7 +111,7 @@ private int tryAcquirePartial() throws InterruptedException { return hasAcquiredPartial ? permitsToRequest : 0; } - private int requestInLowThroughputMode() throws InterruptedException { + private int requestInLowThroughputMode(int amount) throws InterruptedException { // Although LTM can be set / unset by many processes, only the MessageSource thread gets here, // so no actual concurrency logger.debug("Trying to acquire full permits for {}. Permits left: {}", this.id, @@ -120,11 +120,11 @@ private int requestInLowThroughputMode() throws InterruptedException { if (hasAcquired) { logger.debug("Acquired full permits for {}. Permits left: {}", this.id, this.semaphore.availablePermits()); // We've acquired all permits - there's no other process currently processing messages - if (!this.hasAcquiredFullPermits.compareAndSet(false, true)) { + if (this.lowThroughputPermitsAcquired.getAndSet(amount) != 0) { logger.warn("hasAcquiredFullPermits was already true. Permits left: {}", this.semaphore.availablePermits()); } - return this.batchSize; + return amount; } else { return 0; @@ -147,19 +147,22 @@ private boolean tryAcquire(int amount, CurrentThroughputMode currentThroughputMo } @Override - public void releaseBatch() { - maybeSwitchToLowThroughputMode(); - int permitsToRelease = getPermitsToRelease(this.batchSize); + public void release(int amount, ReleaseReason reason) { + logger.trace("Releasing {} permits ({}) for {}. Permits left: {}", amount, reason, this.id, + this.semaphore.availablePermits()); + switch (reason) { + case NONE_FETCHED -> maybeSwitchToLowThroughputMode(); + case PARTIAL_FETCH -> maybeSwitchToHighThroughputMode(amount); + case PROCESSED, LIMITED -> { + // No need to switch throughput mode + } + } + int permitsToRelease = getPermitsToRelease(amount); this.semaphore.release(permitsToRelease); - logger.trace("Released {} permits for {}. Permits left: {}", permitsToRelease, this.id, + logger.debug("Released {} permits ({}) for {}. Permits left: {}", permitsToRelease, reason, this.id, this.semaphore.availablePermits()); } - @Override - public int getBatchSize() { - return this.batchSize; - } - private void maybeSwitchToLowThroughputMode() { if (!BackPressureMode.FIXED_HIGH_THROUGHPUT.equals(this.backPressureConfiguration) && CurrentThroughputMode.HIGH.equals(this.currentThroughputMode)) { @@ -169,25 +172,6 @@ private void maybeSwitchToLowThroughputMode() { } } - @Override - public void release(int amount) { - logger.trace("Releasing {} permits for {}. Permits left: {}", amount, this.id, - this.semaphore.availablePermits()); - maybeSwitchToHighThroughputMode(amount); - int permitsToRelease = getPermitsToRelease(amount); - this.semaphore.release(permitsToRelease); - logger.trace("Released {} permits for {}. Permits left: {}", permitsToRelease, this.id, - this.semaphore.availablePermits()); - } - - private int getPermitsToRelease(int amount) { - return this.hasAcquiredFullPermits.compareAndSet(true, false) - // The first process that gets here should release all permits except for inflight messages - // We can have only one batch of messages at this point since we have all permits - ? this.totalPermits - (this.batchSize - amount) - : amount; - } - private void maybeSwitchToHighThroughputMode(int amount) { if (CurrentThroughputMode.LOW.equals(this.currentThroughputMode)) { logger.debug("{} unused permit(s), setting TM HIGH for {}. Permits left: {}", amount, this.id, @@ -196,6 +180,15 @@ private void maybeSwitchToHighThroughputMode(int amount) { } } + private int getPermitsToRelease(int amount) { + int lowThroughputPermits = this.lowThroughputPermitsAcquired.getAndSet(0); + return lowThroughputPermits > 0 + // The first process that gets here should release all permits except for inflight messages + // We can have only one batch of messages at this point since we have all permits + ? this.totalPermits - (lowThroughputPermits - amount) + : amount; + } + @Override public boolean drain(Duration timeout) { logger.debug("Waiting for up to {} seconds for approx. {} permits to be released for {}", timeout.getSeconds(), diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSource.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSource.java index e71dc4319..9041cd9c8 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSource.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSource.java @@ -17,6 +17,7 @@ import io.awspring.cloud.sqs.ConfigUtils; import io.awspring.cloud.sqs.listener.BackPressureHandler; +import io.awspring.cloud.sqs.listener.BackPressureHandler.ReleaseReason; import io.awspring.cloud.sqs.listener.BatchAwareBackPressureHandler; import io.awspring.cloud.sqs.listener.ContainerOptions; import io.awspring.cloud.sqs.listener.IdentifiableContainerComponent; @@ -214,7 +215,7 @@ private void pollAndEmitMessages() { if (!isRunning()) { logger.debug("MessageSource was stopped after permits where acquired. Returning {} permits", acquiredPermits); - this.backPressureHandler.release(acquiredPermits); + this.backPressureHandler.release(acquiredPermits, ReleaseReason.NONE_FETCHED); continue; } // @formatter:off @@ -252,15 +253,12 @@ private void handlePollBackOff() { protected abstract CompletableFuture> doPollForMessages(int messagesToRequest); public Collection> releaseUnusedPermits(int permits, Collection> msgs) { - if (msgs.isEmpty() && permits == this.backPressureHandler.getBatchSize()) { - this.backPressureHandler.releaseBatch(); - logger.trace("Released batch of unused permits for queue {}", this.pollingEndpointName); - } - else { - int permitsToRelease = permits - msgs.size(); - this.backPressureHandler.release(permitsToRelease); - logger.trace("Released {} unused permits for queue {}", permitsToRelease, this.pollingEndpointName); - } + int polledMessages = msgs.size(); + int permitsToRelease = permits - polledMessages; + ReleaseReason releaseReason = polledMessages == 0 ? ReleaseReason.NONE_FETCHED : ReleaseReason.PARTIAL_FETCH; + this.backPressureHandler.release(permitsToRelease, releaseReason); + logger.trace("Released {} unused ({}) permits for queue {} (messages polled {})", permitsToRelease, + releaseReason, this.pollingEndpointName, polledMessages); return msgs; } @@ -285,7 +283,7 @@ protected AcknowledgementCallback getAcknowledgementCallback() { private void releaseBackPressure() { logger.debug("Releasing permit for queue {}", this.pollingEndpointName); - this.backPressureHandler.release(1); + this.backPressureHandler.release(1, ReleaseReason.PROCESSED); } private Void handleSinkException(Throwable t) {