diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractPipelineMessageListenerContainer.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractPipelineMessageListenerContainer.java
index ec12bc607..401f07248 100644
--- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractPipelineMessageListenerContainer.java
+++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractPipelineMessageListenerContainer.java
@@ -235,7 +235,7 @@ protected BackPressureHandler createBackPressureHandler() {
.throughputConfiguration(containerOptions.getBackPressureMode()).build());
if (containerOptions.getBackPressureLimiter() != null) {
backPressureHandlers.add(new BackPressureHandlerLimiter(containerOptions.getBackPressureLimiter(),
- acquireTimeout, containerOptions.getStandbyLimitPollingInterval()));
+ acquireTimeout, containerOptions.getStandbyLimitPollingInterval(), batchSize));
}
return new CompositeBackPressureHandler(backPressureHandlers, batchSize);
}
diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandler.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandler.java
index 1d76d6589..f2ff274b1 100644
--- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandler.java
+++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandler.java
@@ -29,7 +29,7 @@
public interface BackPressureHandler {
/**
- * Request a number of permits. Each obtained permit allows the
+ * Requests a number of permits. Each obtained permit allows the
* {@link io.awspring.cloud.sqs.listener.source.MessageSource} to retrieve one message.
* @param amount the amount of permits to request.
* @return the amount of permits obtained.
@@ -38,11 +38,24 @@ public interface BackPressureHandler {
int request(int amount) throws InterruptedException;
/**
- * Release the specified amount of permits. Each message that has been processed should release one permit, whether
- * processing was successful or not.
+ * Releases the specified amount of permits for processed messages. Each message that has been processed should
+ * release one permit, whether processing was successful or not.
+ *
+ * This method can is called in the following use cases:
+ *
+ * - {@link ReleaseReason#LIMITED}: permits were not used because another BackPressureHandler has a lower permits
+ * limit and the difference in permits needs to be returned.
+ * - {@link ReleaseReason#NONE_FETCHED}: none of the permits were actually used because no messages were retrieved
+ * from SQS. Permits need to be returned.
+ * - {@link ReleaseReason#PARTIAL_FETCH}: some of the permits were used (some messages were retrieved from SQS).
+ * The unused ones need to be returned. The amount to be returned might be {@literal 0}, in which case it means all
+ * the permits will be used as the same number of messages were fetched from SQS.
+ * - {@link ReleaseReason#PROCESSED}: a message processing finished, successfully or not.
+ *
* @param amount the amount of permits to release.
+ * @param reason the reason why the permits were released.
*/
- void release(int amount);
+ void release(int amount, ReleaseReason reason);
/**
* Attempts to acquire all permits up to the specified timeout. If successful, means all permits were returned and
@@ -52,4 +65,24 @@ public interface BackPressureHandler {
*/
boolean drain(Duration timeout);
+ enum ReleaseReason {
+ /**
+ * Permits were not used because another BackPressureHandler has a lower permits limit and the difference need
+ * to be aligned across all handlers.
+ */
+ LIMITED,
+ /**
+ * No messages were retrieved from SQS, so all permits need to be returned.
+ */
+ NONE_FETCHED,
+ /**
+ * Some messages were fetched from SQS. Unused permits need to be returned.
+ */
+ PARTIAL_FETCH,
+ /**
+ * The processing of one or more messages finished, successfully or not.
+ */
+ PROCESSED;
+ }
+
}
diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandlerLimiter.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandlerLimiter.java
index 0ff48a412..cd031a129 100644
--- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandlerLimiter.java
+++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandlerLimiter.java
@@ -27,7 +27,7 @@
*
* @see BackPressureLimiter
*/
-public class BackPressureHandlerLimiter implements BackPressureHandler {
+public class BackPressureHandlerLimiter implements BatchAwareBackPressureHandler, IdentifiableContainerComponent {
/**
* The {@link BackPressureLimiter} which computes a limit on how many permits can be requested at a given moment.
@@ -54,11 +54,31 @@ public class BackPressureHandlerLimiter implements BackPressureHandler {
private final ReducibleSemaphore semaphore = new ReducibleSemaphore(0);
+ private final int batchSize;
+
+ private String id;
+
public BackPressureHandlerLimiter(BackPressureLimiter backPressureLimiter, Duration acquireTimeout,
- Duration standbyLimitPollingInterval) {
+ Duration standbyLimitPollingInterval, int batchSize) {
this.backPressureLimiter = backPressureLimiter;
this.acquireTimeout = acquireTimeout;
this.standbyLimitPollingInterval = standbyLimitPollingInterval;
+ this.batchSize = batchSize;
+ }
+
+ @Override
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public int requestBatch() throws InterruptedException {
+ return request(batchSize);
}
@Override
@@ -75,7 +95,7 @@ public int request(int amount) throws InterruptedException {
}
@Override
- public void release(int amount) {
+ public void release(int amount, ReleaseReason reason) {
semaphore.release(amount);
}
diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BatchAwareBackPressureHandler.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BatchAwareBackPressureHandler.java
index 51e12e0a0..c9ce20f9b 100644
--- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BatchAwareBackPressureHandler.java
+++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BatchAwareBackPressureHandler.java
@@ -30,18 +30,4 @@ public interface BatchAwareBackPressureHandler extends BackPressureHandler {
* @throws InterruptedException if the Thread is interrupted while waiting for permits.
*/
int requestBatch() throws InterruptedException;
-
- /**
- * Release a batch of permits. This has the semantics of letting the {@link BackPressureHandler} know that all
- * permits from a batch are being released, in opposition to {@link #release(int)} in which any number of permits
- * can be specified.
- */
- void releaseBatch();
-
- /**
- * Return the configured batch size for this handler.
- * @return the batch size.
- */
- int getBatchSize();
-
}
diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/CompositeBackPressureHandler.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/CompositeBackPressureHandler.java
index a0b753dc6..42202438b 100644
--- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/CompositeBackPressureHandler.java
+++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/CompositeBackPressureHandler.java
@@ -23,6 +23,7 @@ public class CompositeBackPressureHandler implements BatchAwareBackPressureHandl
private final List backPressureHandlers;
private final int batchSize;
+
private String id;
public CompositeBackPressureHandler(List backPressureHandlers, int batchSize) {
@@ -33,6 +34,9 @@ public CompositeBackPressureHandler(List backPressureHandle
@Override
public void setId(String id) {
this.id = id;
+ backPressureHandlers.stream().filter(IdentifiableContainerComponent.class::isInstance)
+ .map(IdentifiableContainerComponent.class::cast)
+ .forEach(bph -> bph.setId(bph.getClass().getSimpleName() + "-" + id));
}
@Override
@@ -45,16 +49,6 @@ public int requestBatch() throws InterruptedException {
return request(batchSize);
}
- @Override
- public void releaseBatch() {
- release(batchSize);
- }
-
- @Override
- public int getBatchSize() {
- return batchSize;
- }
-
@Override
public int request(int amount) throws InterruptedException {
int obtained = amount;
@@ -66,23 +60,16 @@ public int request(int amount) throws InterruptedException {
for (int i = 0; i < backPressureHandlers.size(); i++) {
int obtainedForBph = obtainedPerBph[i];
if (obtainedForBph > obtained) {
- if (amount == batchSize) {
- backPressureHandlers.get(i).release(amount);
- // FIXME what if we cannot acquire 'obtained' (< 'amount') permits?
- backPressureHandlers.get(i).request(obtained);
- }
- else {
- backPressureHandlers.get(i).release(obtainedForBph - obtained);
- }
+ backPressureHandlers.get(i).release(obtainedForBph - obtained, ReleaseReason.LIMITED);
}
}
return obtained;
}
@Override
- public void release(int amount) {
+ public void release(int amount, ReleaseReason reason) {
for (BackPressureHandler handler : backPressureHandlers) {
- handler.release(amount);
+ handler.release(amount, reason);
}
}
diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SemaphoreBackPressureHandler.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SemaphoreBackPressureHandler.java
index 288664826..70ed3f306 100644
--- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SemaphoreBackPressureHandler.java
+++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SemaphoreBackPressureHandler.java
@@ -19,7 +19,7 @@
import java.util.Arrays;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
@@ -47,7 +47,7 @@ public class SemaphoreBackPressureHandler implements BatchAwareBackPressureHandl
private volatile CurrentThroughputMode currentThroughputMode;
- private final AtomicBoolean hasAcquiredFullPermits = new AtomicBoolean(false);
+ private final AtomicInteger lowThroughputPermitsAcquired = new AtomicInteger(0);
private String id;
@@ -79,34 +79,31 @@ public String getId() {
}
@Override
- public int request(int amount) throws InterruptedException {
- if (amount == batchSize) {
- return requestBatch();
- }
- return tryAcquire(amount, this.currentThroughputMode) ? amount : 0;
+ public int requestBatch() throws InterruptedException {
+ return request(batchSize);
}
// @formatter:off
@Override
- public int requestBatch() throws InterruptedException {
+ public int request(int amount) throws InterruptedException {
return CurrentThroughputMode.LOW.equals(this.currentThroughputMode)
- ? requestInLowThroughputMode()
- : requestInHighThroughputMode();
+ ? requestInLowThroughputMode(amount)
+ : requestInHighThroughputMode(amount);
}
- private int requestInHighThroughputMode() throws InterruptedException {
- return tryAcquire(this.batchSize, CurrentThroughputMode.HIGH)
- ? this.batchSize
- : tryAcquirePartial();
+ private int requestInHighThroughputMode(int amount) throws InterruptedException {
+ return tryAcquire(amount, CurrentThroughputMode.HIGH)
+ ? amount
+ : tryAcquirePartial(amount);
}
// @formatter:on
- private int tryAcquirePartial() throws InterruptedException {
+ private int tryAcquirePartial(int max) throws InterruptedException {
int availablePermits = this.semaphore.availablePermits();
if (availablePermits == 0 || BackPressureMode.ALWAYS_POLL_MAX_MESSAGES.equals(this.backPressureConfiguration)) {
return 0;
}
- int permitsToRequest = Math.min(availablePermits, this.batchSize);
+ int permitsToRequest = Math.min(availablePermits, max);
CurrentThroughputMode currentThroughputModeNow = this.currentThroughputMode;
logger.trace("Trying to acquire partial batch of {} permits from {} available for {} in TM {}",
permitsToRequest, availablePermits, this.id, currentThroughputModeNow);
@@ -114,7 +111,7 @@ private int tryAcquirePartial() throws InterruptedException {
return hasAcquiredPartial ? permitsToRequest : 0;
}
- private int requestInLowThroughputMode() throws InterruptedException {
+ private int requestInLowThroughputMode(int amount) throws InterruptedException {
// Although LTM can be set / unset by many processes, only the MessageSource thread gets here,
// so no actual concurrency
logger.debug("Trying to acquire full permits for {}. Permits left: {}", this.id,
@@ -123,11 +120,11 @@ private int requestInLowThroughputMode() throws InterruptedException {
if (hasAcquired) {
logger.debug("Acquired full permits for {}. Permits left: {}", this.id, this.semaphore.availablePermits());
// We've acquired all permits - there's no other process currently processing messages
- if (!this.hasAcquiredFullPermits.compareAndSet(false, true)) {
+ if (this.lowThroughputPermitsAcquired.getAndSet(amount) != 0) {
logger.warn("hasAcquiredFullPermits was already true. Permits left: {}",
this.semaphore.availablePermits());
}
- return this.batchSize;
+ return amount;
}
else {
return 0;
@@ -150,19 +147,22 @@ private boolean tryAcquire(int amount, CurrentThroughputMode currentThroughputMo
}
@Override
- public void releaseBatch() {
- maybeSwitchToLowThroughputMode();
- int permitsToRelease = getPermitsToRelease(this.batchSize);
+ public void release(int amount, ReleaseReason reason) {
+ logger.trace("Releasing {} permits ({}) for {}. Permits left: {}", amount, reason, this.id,
+ this.semaphore.availablePermits());
+ switch (reason) {
+ case NONE_FETCHED -> maybeSwitchToLowThroughputMode();
+ case PARTIAL_FETCH -> maybeSwitchToHighThroughputMode(amount);
+ case PROCESSED, LIMITED -> {
+ // No need to switch throughput mode
+ }
+ }
+ int permitsToRelease = getPermitsToRelease(amount);
this.semaphore.release(permitsToRelease);
- logger.trace("Released {} permits for {}. Permits left: {}", permitsToRelease, this.id,
+ logger.debug("Released {} permits ({}) for {}. Permits left: {}", permitsToRelease, reason, this.id,
this.semaphore.availablePermits());
}
- @Override
- public int getBatchSize() {
- return this.batchSize;
- }
-
private void maybeSwitchToLowThroughputMode() {
if (!BackPressureMode.FIXED_HIGH_THROUGHPUT.equals(this.backPressureConfiguration)
&& CurrentThroughputMode.HIGH.equals(this.currentThroughputMode)) {
@@ -172,37 +172,23 @@ private void maybeSwitchToLowThroughputMode() {
}
}
- @Override
- public void release(int amount) {
- if (amount == batchSize) {
- releaseBatch();
- return;
+ private void maybeSwitchToHighThroughputMode(int amount) {
+ if (CurrentThroughputMode.LOW.equals(this.currentThroughputMode)) {
+ logger.debug("{} unused permit(s), setting TM HIGH for {}. Permits left: {}", amount, this.id,
+ this.semaphore.availablePermits());
+ this.currentThroughputMode = CurrentThroughputMode.HIGH;
}
- logger.trace("Releasing {} permits for {}. Permits left: {}", amount, this.id,
- this.semaphore.availablePermits());
- maybeSwitchToHighThroughputMode(amount);
- int permitsToRelease = getPermitsToRelease(amount);
- this.semaphore.release(permitsToRelease);
- logger.trace("Released {} permits for {}. Permits left: {}", permitsToRelease, this.id,
- this.semaphore.availablePermits());
}
private int getPermitsToRelease(int amount) {
- return this.hasAcquiredFullPermits.compareAndSet(true, false)
+ int lowThroughputPermits = this.lowThroughputPermitsAcquired.getAndSet(0);
+ return lowThroughputPermits > 0
// The first process that gets here should release all permits except for inflight messages
// We can have only one batch of messages at this point since we have all permits
- ? this.totalPermits - (this.batchSize - amount)
+ ? this.totalPermits - (lowThroughputPermits - amount)
: amount;
}
- private void maybeSwitchToHighThroughputMode(int amount) {
- if (CurrentThroughputMode.LOW.equals(this.currentThroughputMode)) {
- logger.debug("{} unused permit(s), setting TM HIGH for {}. Permits left: {}", amount, this.id,
- this.semaphore.availablePermits());
- this.currentThroughputMode = CurrentThroughputMode.HIGH;
- }
- }
-
@Override
public boolean drain(Duration timeout) {
logger.debug("Waiting for up to {} seconds for approx. {} permits to be released for {}", timeout.getSeconds(),
diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSource.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSource.java
index e71dc4319..9041cd9c8 100644
--- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSource.java
+++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSource.java
@@ -17,6 +17,7 @@
import io.awspring.cloud.sqs.ConfigUtils;
import io.awspring.cloud.sqs.listener.BackPressureHandler;
+import io.awspring.cloud.sqs.listener.BackPressureHandler.ReleaseReason;
import io.awspring.cloud.sqs.listener.BatchAwareBackPressureHandler;
import io.awspring.cloud.sqs.listener.ContainerOptions;
import io.awspring.cloud.sqs.listener.IdentifiableContainerComponent;
@@ -214,7 +215,7 @@ private void pollAndEmitMessages() {
if (!isRunning()) {
logger.debug("MessageSource was stopped after permits where acquired. Returning {} permits",
acquiredPermits);
- this.backPressureHandler.release(acquiredPermits);
+ this.backPressureHandler.release(acquiredPermits, ReleaseReason.NONE_FETCHED);
continue;
}
// @formatter:off
@@ -252,15 +253,12 @@ private void handlePollBackOff() {
protected abstract CompletableFuture> doPollForMessages(int messagesToRequest);
public Collection> releaseUnusedPermits(int permits, Collection> msgs) {
- if (msgs.isEmpty() && permits == this.backPressureHandler.getBatchSize()) {
- this.backPressureHandler.releaseBatch();
- logger.trace("Released batch of unused permits for queue {}", this.pollingEndpointName);
- }
- else {
- int permitsToRelease = permits - msgs.size();
- this.backPressureHandler.release(permitsToRelease);
- logger.trace("Released {} unused permits for queue {}", permitsToRelease, this.pollingEndpointName);
- }
+ int polledMessages = msgs.size();
+ int permitsToRelease = permits - polledMessages;
+ ReleaseReason releaseReason = polledMessages == 0 ? ReleaseReason.NONE_FETCHED : ReleaseReason.PARTIAL_FETCH;
+ this.backPressureHandler.release(permitsToRelease, releaseReason);
+ logger.trace("Released {} unused ({}) permits for queue {} (messages polled {})", permitsToRelease,
+ releaseReason, this.pollingEndpointName, polledMessages);
return msgs;
}
@@ -285,7 +283,7 @@ protected AcknowledgementCallback getAcknowledgementCallback() {
private void releaseBackPressure() {
logger.debug("Releasing permit for queue {}", this.pollingEndpointName);
- this.backPressureHandler.release(1);
+ this.backPressureHandler.release(1, ReleaseReason.PROCESSED);
}
private Void handleSinkException(Throwable t) {