From bb998d4cc4fb26435c6345b6c1ce7675b2478b05 Mon Sep 17 00:00:00 2001 From: Loic Rouchon Date: Tue, 7 Jan 2025 17:56:12 +0100 Subject: [PATCH] Introduce a CompositeBackPressureHandler allowing for composition of BackPressureHandlers (#1251) --- ...tractPipelineMessageListenerContainer.java | 2 +- .../sqs/listener/BackPressureHandler.java | 41 ++++++++- .../listener/BackPressureHandlerLimiter.java | 26 +++++- .../BatchAwareBackPressureHandler.java | 14 --- .../CompositeBackPressureHandler.java | 27 ++---- .../SemaphoreBackPressureHandler.java | 86 ++++++++----------- .../source/AbstractPollingMessageSource.java | 20 ++--- 7 files changed, 113 insertions(+), 103 deletions(-) diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractPipelineMessageListenerContainer.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractPipelineMessageListenerContainer.java index ec12bc607..401f07248 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractPipelineMessageListenerContainer.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractPipelineMessageListenerContainer.java @@ -235,7 +235,7 @@ protected BackPressureHandler createBackPressureHandler() { .throughputConfiguration(containerOptions.getBackPressureMode()).build()); if (containerOptions.getBackPressureLimiter() != null) { backPressureHandlers.add(new BackPressureHandlerLimiter(containerOptions.getBackPressureLimiter(), - acquireTimeout, containerOptions.getStandbyLimitPollingInterval())); + acquireTimeout, containerOptions.getStandbyLimitPollingInterval(), batchSize)); } return new CompositeBackPressureHandler(backPressureHandlers, batchSize); } diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandler.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandler.java index 1d76d6589..f2ff274b1 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandler.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandler.java @@ -29,7 +29,7 @@ public interface BackPressureHandler { /** - * Request a number of permits. Each obtained permit allows the + * Requests a number of permits. Each obtained permit allows the * {@link io.awspring.cloud.sqs.listener.source.MessageSource} to retrieve one message. * @param amount the amount of permits to request. * @return the amount of permits obtained. @@ -38,11 +38,24 @@ public interface BackPressureHandler { int request(int amount) throws InterruptedException; /** - * Release the specified amount of permits. Each message that has been processed should release one permit, whether - * processing was successful or not. + * Releases the specified amount of permits for processed messages. Each message that has been processed should + * release one permit, whether processing was successful or not. + *

+ * This method can is called in the following use cases: + *

* @param amount the amount of permits to release. + * @param reason the reason why the permits were released. */ - void release(int amount); + void release(int amount, ReleaseReason reason); /** * Attempts to acquire all permits up to the specified timeout. If successful, means all permits were returned and @@ -52,4 +65,24 @@ public interface BackPressureHandler { */ boolean drain(Duration timeout); + enum ReleaseReason { + /** + * Permits were not used because another BackPressureHandler has a lower permits limit and the difference need + * to be aligned across all handlers. + */ + LIMITED, + /** + * No messages were retrieved from SQS, so all permits need to be returned. + */ + NONE_FETCHED, + /** + * Some messages were fetched from SQS. Unused permits need to be returned. + */ + PARTIAL_FETCH, + /** + * The processing of one or more messages finished, successfully or not. + */ + PROCESSED; + } + } diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandlerLimiter.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandlerLimiter.java index 0ff48a412..cd031a129 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandlerLimiter.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandlerLimiter.java @@ -27,7 +27,7 @@ * * @see BackPressureLimiter */ -public class BackPressureHandlerLimiter implements BackPressureHandler { +public class BackPressureHandlerLimiter implements BatchAwareBackPressureHandler, IdentifiableContainerComponent { /** * The {@link BackPressureLimiter} which computes a limit on how many permits can be requested at a given moment. @@ -54,11 +54,31 @@ public class BackPressureHandlerLimiter implements BackPressureHandler { private final ReducibleSemaphore semaphore = new ReducibleSemaphore(0); + private final int batchSize; + + private String id; + public BackPressureHandlerLimiter(BackPressureLimiter backPressureLimiter, Duration acquireTimeout, - Duration standbyLimitPollingInterval) { + Duration standbyLimitPollingInterval, int batchSize) { this.backPressureLimiter = backPressureLimiter; this.acquireTimeout = acquireTimeout; this.standbyLimitPollingInterval = standbyLimitPollingInterval; + this.batchSize = batchSize; + } + + @Override + public void setId(String id) { + this.id = id; + } + + @Override + public String getId() { + return id; + } + + @Override + public int requestBatch() throws InterruptedException { + return request(batchSize); } @Override @@ -75,7 +95,7 @@ public int request(int amount) throws InterruptedException { } @Override - public void release(int amount) { + public void release(int amount, ReleaseReason reason) { semaphore.release(amount); } diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BatchAwareBackPressureHandler.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BatchAwareBackPressureHandler.java index 51e12e0a0..c9ce20f9b 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BatchAwareBackPressureHandler.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BatchAwareBackPressureHandler.java @@ -30,18 +30,4 @@ public interface BatchAwareBackPressureHandler extends BackPressureHandler { * @throws InterruptedException if the Thread is interrupted while waiting for permits. */ int requestBatch() throws InterruptedException; - - /** - * Release a batch of permits. This has the semantics of letting the {@link BackPressureHandler} know that all - * permits from a batch are being released, in opposition to {@link #release(int)} in which any number of permits - * can be specified. - */ - void releaseBatch(); - - /** - * Return the configured batch size for this handler. - * @return the batch size. - */ - int getBatchSize(); - } diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/CompositeBackPressureHandler.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/CompositeBackPressureHandler.java index a0b753dc6..42202438b 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/CompositeBackPressureHandler.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/CompositeBackPressureHandler.java @@ -23,6 +23,7 @@ public class CompositeBackPressureHandler implements BatchAwareBackPressureHandl private final List backPressureHandlers; private final int batchSize; + private String id; public CompositeBackPressureHandler(List backPressureHandlers, int batchSize) { @@ -33,6 +34,9 @@ public CompositeBackPressureHandler(List backPressureHandle @Override public void setId(String id) { this.id = id; + backPressureHandlers.stream().filter(IdentifiableContainerComponent.class::isInstance) + .map(IdentifiableContainerComponent.class::cast) + .forEach(bph -> bph.setId(bph.getClass().getSimpleName() + "-" + id)); } @Override @@ -45,16 +49,6 @@ public int requestBatch() throws InterruptedException { return request(batchSize); } - @Override - public void releaseBatch() { - release(batchSize); - } - - @Override - public int getBatchSize() { - return batchSize; - } - @Override public int request(int amount) throws InterruptedException { int obtained = amount; @@ -66,23 +60,16 @@ public int request(int amount) throws InterruptedException { for (int i = 0; i < backPressureHandlers.size(); i++) { int obtainedForBph = obtainedPerBph[i]; if (obtainedForBph > obtained) { - if (amount == batchSize) { - backPressureHandlers.get(i).release(amount); - // FIXME what if we cannot acquire 'obtained' (< 'amount') permits? - backPressureHandlers.get(i).request(obtained); - } - else { - backPressureHandlers.get(i).release(obtainedForBph - obtained); - } + backPressureHandlers.get(i).release(obtainedForBph - obtained, ReleaseReason.LIMITED); } } return obtained; } @Override - public void release(int amount) { + public void release(int amount, ReleaseReason reason) { for (BackPressureHandler handler : backPressureHandlers) { - handler.release(amount); + handler.release(amount, reason); } } diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SemaphoreBackPressureHandler.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SemaphoreBackPressureHandler.java index 288664826..70ed3f306 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SemaphoreBackPressureHandler.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SemaphoreBackPressureHandler.java @@ -19,7 +19,7 @@ import java.util.Arrays; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.Assert; @@ -47,7 +47,7 @@ public class SemaphoreBackPressureHandler implements BatchAwareBackPressureHandl private volatile CurrentThroughputMode currentThroughputMode; - private final AtomicBoolean hasAcquiredFullPermits = new AtomicBoolean(false); + private final AtomicInteger lowThroughputPermitsAcquired = new AtomicInteger(0); private String id; @@ -79,34 +79,31 @@ public String getId() { } @Override - public int request(int amount) throws InterruptedException { - if (amount == batchSize) { - return requestBatch(); - } - return tryAcquire(amount, this.currentThroughputMode) ? amount : 0; + public int requestBatch() throws InterruptedException { + return request(batchSize); } // @formatter:off @Override - public int requestBatch() throws InterruptedException { + public int request(int amount) throws InterruptedException { return CurrentThroughputMode.LOW.equals(this.currentThroughputMode) - ? requestInLowThroughputMode() - : requestInHighThroughputMode(); + ? requestInLowThroughputMode(amount) + : requestInHighThroughputMode(amount); } - private int requestInHighThroughputMode() throws InterruptedException { - return tryAcquire(this.batchSize, CurrentThroughputMode.HIGH) - ? this.batchSize - : tryAcquirePartial(); + private int requestInHighThroughputMode(int amount) throws InterruptedException { + return tryAcquire(amount, CurrentThroughputMode.HIGH) + ? amount + : tryAcquirePartial(amount); } // @formatter:on - private int tryAcquirePartial() throws InterruptedException { + private int tryAcquirePartial(int max) throws InterruptedException { int availablePermits = this.semaphore.availablePermits(); if (availablePermits == 0 || BackPressureMode.ALWAYS_POLL_MAX_MESSAGES.equals(this.backPressureConfiguration)) { return 0; } - int permitsToRequest = Math.min(availablePermits, this.batchSize); + int permitsToRequest = Math.min(availablePermits, max); CurrentThroughputMode currentThroughputModeNow = this.currentThroughputMode; logger.trace("Trying to acquire partial batch of {} permits from {} available for {} in TM {}", permitsToRequest, availablePermits, this.id, currentThroughputModeNow); @@ -114,7 +111,7 @@ private int tryAcquirePartial() throws InterruptedException { return hasAcquiredPartial ? permitsToRequest : 0; } - private int requestInLowThroughputMode() throws InterruptedException { + private int requestInLowThroughputMode(int amount) throws InterruptedException { // Although LTM can be set / unset by many processes, only the MessageSource thread gets here, // so no actual concurrency logger.debug("Trying to acquire full permits for {}. Permits left: {}", this.id, @@ -123,11 +120,11 @@ private int requestInLowThroughputMode() throws InterruptedException { if (hasAcquired) { logger.debug("Acquired full permits for {}. Permits left: {}", this.id, this.semaphore.availablePermits()); // We've acquired all permits - there's no other process currently processing messages - if (!this.hasAcquiredFullPermits.compareAndSet(false, true)) { + if (this.lowThroughputPermitsAcquired.getAndSet(amount) != 0) { logger.warn("hasAcquiredFullPermits was already true. Permits left: {}", this.semaphore.availablePermits()); } - return this.batchSize; + return amount; } else { return 0; @@ -150,19 +147,22 @@ private boolean tryAcquire(int amount, CurrentThroughputMode currentThroughputMo } @Override - public void releaseBatch() { - maybeSwitchToLowThroughputMode(); - int permitsToRelease = getPermitsToRelease(this.batchSize); + public void release(int amount, ReleaseReason reason) { + logger.trace("Releasing {} permits ({}) for {}. Permits left: {}", amount, reason, this.id, + this.semaphore.availablePermits()); + switch (reason) { + case NONE_FETCHED -> maybeSwitchToLowThroughputMode(); + case PARTIAL_FETCH -> maybeSwitchToHighThroughputMode(amount); + case PROCESSED, LIMITED -> { + // No need to switch throughput mode + } + } + int permitsToRelease = getPermitsToRelease(amount); this.semaphore.release(permitsToRelease); - logger.trace("Released {} permits for {}. Permits left: {}", permitsToRelease, this.id, + logger.debug("Released {} permits ({}) for {}. Permits left: {}", permitsToRelease, reason, this.id, this.semaphore.availablePermits()); } - @Override - public int getBatchSize() { - return this.batchSize; - } - private void maybeSwitchToLowThroughputMode() { if (!BackPressureMode.FIXED_HIGH_THROUGHPUT.equals(this.backPressureConfiguration) && CurrentThroughputMode.HIGH.equals(this.currentThroughputMode)) { @@ -172,37 +172,23 @@ private void maybeSwitchToLowThroughputMode() { } } - @Override - public void release(int amount) { - if (amount == batchSize) { - releaseBatch(); - return; + private void maybeSwitchToHighThroughputMode(int amount) { + if (CurrentThroughputMode.LOW.equals(this.currentThroughputMode)) { + logger.debug("{} unused permit(s), setting TM HIGH for {}. Permits left: {}", amount, this.id, + this.semaphore.availablePermits()); + this.currentThroughputMode = CurrentThroughputMode.HIGH; } - logger.trace("Releasing {} permits for {}. Permits left: {}", amount, this.id, - this.semaphore.availablePermits()); - maybeSwitchToHighThroughputMode(amount); - int permitsToRelease = getPermitsToRelease(amount); - this.semaphore.release(permitsToRelease); - logger.trace("Released {} permits for {}. Permits left: {}", permitsToRelease, this.id, - this.semaphore.availablePermits()); } private int getPermitsToRelease(int amount) { - return this.hasAcquiredFullPermits.compareAndSet(true, false) + int lowThroughputPermits = this.lowThroughputPermitsAcquired.getAndSet(0); + return lowThroughputPermits > 0 // The first process that gets here should release all permits except for inflight messages // We can have only one batch of messages at this point since we have all permits - ? this.totalPermits - (this.batchSize - amount) + ? this.totalPermits - (lowThroughputPermits - amount) : amount; } - private void maybeSwitchToHighThroughputMode(int amount) { - if (CurrentThroughputMode.LOW.equals(this.currentThroughputMode)) { - logger.debug("{} unused permit(s), setting TM HIGH for {}. Permits left: {}", amount, this.id, - this.semaphore.availablePermits()); - this.currentThroughputMode = CurrentThroughputMode.HIGH; - } - } - @Override public boolean drain(Duration timeout) { logger.debug("Waiting for up to {} seconds for approx. {} permits to be released for {}", timeout.getSeconds(), diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSource.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSource.java index e71dc4319..9041cd9c8 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSource.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSource.java @@ -17,6 +17,7 @@ import io.awspring.cloud.sqs.ConfigUtils; import io.awspring.cloud.sqs.listener.BackPressureHandler; +import io.awspring.cloud.sqs.listener.BackPressureHandler.ReleaseReason; import io.awspring.cloud.sqs.listener.BatchAwareBackPressureHandler; import io.awspring.cloud.sqs.listener.ContainerOptions; import io.awspring.cloud.sqs.listener.IdentifiableContainerComponent; @@ -214,7 +215,7 @@ private void pollAndEmitMessages() { if (!isRunning()) { logger.debug("MessageSource was stopped after permits where acquired. Returning {} permits", acquiredPermits); - this.backPressureHandler.release(acquiredPermits); + this.backPressureHandler.release(acquiredPermits, ReleaseReason.NONE_FETCHED); continue; } // @formatter:off @@ -252,15 +253,12 @@ private void handlePollBackOff() { protected abstract CompletableFuture> doPollForMessages(int messagesToRequest); public Collection> releaseUnusedPermits(int permits, Collection> msgs) { - if (msgs.isEmpty() && permits == this.backPressureHandler.getBatchSize()) { - this.backPressureHandler.releaseBatch(); - logger.trace("Released batch of unused permits for queue {}", this.pollingEndpointName); - } - else { - int permitsToRelease = permits - msgs.size(); - this.backPressureHandler.release(permitsToRelease); - logger.trace("Released {} unused permits for queue {}", permitsToRelease, this.pollingEndpointName); - } + int polledMessages = msgs.size(); + int permitsToRelease = permits - polledMessages; + ReleaseReason releaseReason = polledMessages == 0 ? ReleaseReason.NONE_FETCHED : ReleaseReason.PARTIAL_FETCH; + this.backPressureHandler.release(permitsToRelease, releaseReason); + logger.trace("Released {} unused ({}) permits for queue {} (messages polled {})", permitsToRelease, + releaseReason, this.pollingEndpointName, polledMessages); return msgs; } @@ -285,7 +283,7 @@ protected AcknowledgementCallback getAcknowledgementCallback() { private void releaseBackPressure() { logger.debug("Releasing permit for queue {}", this.pollingEndpointName); - this.backPressureHandler.release(1); + this.backPressureHandler.release(1, ReleaseReason.PROCESSED); } private Void handleSinkException(Throwable t) {