Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamically configure SemaphoreBackPressureHandler with BackPressureLimiter (#1251) #1308

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,16 @@ public abstract class AbstractContainerOptions<O extends ContainerOptions<O, B>,

private final Duration maxDelayBetweenPolls;

private final Duration standbyLimitPollingInterval;

private final Duration listenerShutdownTimeout;

private final Duration acknowledgementShutdownTimeout;

private final BackPressureMode backPressureMode;

private final BackPressureLimiter backPressureLimiter;

private final ListenerMode listenerMode;

private final MessagingMessageConverter<?> messageConverter;
Expand Down Expand Up @@ -80,10 +84,12 @@ protected AbstractContainerOptions(Builder<?, ?> builder) {
this.autoStartup = builder.autoStartup;
this.pollTimeout = builder.pollTimeout;
this.pollBackOffPolicy = builder.pollBackOffPolicy;
this.standbyLimitPollingInterval = builder.standbyLimitPollingInterval;
this.maxDelayBetweenPolls = builder.maxDelayBetweenPolls;
this.listenerShutdownTimeout = builder.listenerShutdownTimeout;
this.acknowledgementShutdownTimeout = builder.acknowledgementShutdownTimeout;
this.backPressureMode = builder.backPressureMode;
this.backPressureLimiter = builder.backPressureLimiter;
this.listenerMode = builder.listenerMode;
this.messageConverter = builder.messageConverter;
this.acknowledgementMode = builder.acknowledgementMode;
Expand Down Expand Up @@ -122,6 +128,11 @@ public BackOffPolicy getPollBackOffPolicy() {
return this.pollBackOffPolicy;
}

@Override
public Duration getStandbyLimitPollingInterval() {
return this.standbyLimitPollingInterval;
}

@Override
public Duration getMaxDelayBetweenPolls() {
return this.maxDelayBetweenPolls;
Expand Down Expand Up @@ -154,6 +165,11 @@ public BackPressureMode getBackPressureMode() {
return this.backPressureMode;
}

@Override
public BackPressureLimiter getBackPressureLimiter() {
return this.backPressureLimiter;
}

@Override
public ListenerMode getListenerMode() {
return this.listenerMode;
Expand Down Expand Up @@ -206,6 +222,8 @@ protected abstract static class Builder<B extends ContainerOptionsBuilder<B, O>,

private static final BackOffPolicy DEFAULT_POLL_BACK_OFF_POLICY = buildDefaultBackOffPolicy();

private static final Duration DEFAULT_STANDBY_LIMIT_POLLING_INTERVAL = Duration.ofMillis(100);

private static final Duration DEFAULT_SEMAPHORE_TIMEOUT = Duration.ofSeconds(10);

private static final Duration DEFAULT_LISTENER_SHUTDOWN_TIMEOUT = Duration.ofSeconds(20);
Expand All @@ -214,6 +232,8 @@ protected abstract static class Builder<B extends ContainerOptionsBuilder<B, O>,

private static final BackPressureMode DEFAULT_THROUGHPUT_CONFIGURATION = BackPressureMode.AUTO;

private static final BackPressureLimiter DEFAULT_BACKPRESSURE_LIMITER = null;

private static final ListenerMode DEFAULT_MESSAGE_DELIVERY_STRATEGY = ListenerMode.SINGLE_MESSAGE;

private static final MessagingMessageConverter<?> DEFAULT_MESSAGE_CONVERTER = new SqsMessagingMessageConverter();
Expand All @@ -230,10 +250,14 @@ protected abstract static class Builder<B extends ContainerOptionsBuilder<B, O>,

private BackOffPolicy pollBackOffPolicy = DEFAULT_POLL_BACK_OFF_POLICY;

private Duration standbyLimitPollingInterval = DEFAULT_STANDBY_LIMIT_POLLING_INTERVAL;

private Duration maxDelayBetweenPolls = DEFAULT_SEMAPHORE_TIMEOUT;

private BackPressureMode backPressureMode = DEFAULT_THROUGHPUT_CONFIGURATION;

private BackPressureLimiter backPressureLimiter = DEFAULT_BACKPRESSURE_LIMITER;

private Duration listenerShutdownTimeout = DEFAULT_LISTENER_SHUTDOWN_TIMEOUT;

private Duration acknowledgementShutdownTimeout = DEFAULT_ACKNOWLEDGEMENT_SHUTDOWN_TIMEOUT;
Expand Down Expand Up @@ -272,6 +296,7 @@ protected Builder(AbstractContainerOptions<?, ?> options) {
this.listenerShutdownTimeout = options.listenerShutdownTimeout;
this.acknowledgementShutdownTimeout = options.acknowledgementShutdownTimeout;
this.backPressureMode = options.backPressureMode;
this.backPressureLimiter = options.backPressureLimiter;
this.listenerMode = options.listenerMode;
this.messageConverter = options.messageConverter;
this.acknowledgementMode = options.acknowledgementMode;
Expand Down Expand Up @@ -315,6 +340,13 @@ public B pollBackOffPolicy(BackOffPolicy pollBackOffPolicy) {
return self();
}

@Override
public B standbyLimitPollingInterval(Duration standbyLimitPollingInterval) {
Assert.notNull(standbyLimitPollingInterval, "standbyLimitPollingInterval cannot be null");
this.standbyLimitPollingInterval = standbyLimitPollingInterval;
return self();
}

@Override
public B maxDelayBetweenPolls(Duration maxDelayBetweenPolls) {
Assert.notNull(maxDelayBetweenPolls, "semaphoreAcquireTimeout cannot be null");
Expand Down Expand Up @@ -364,6 +396,12 @@ public B backPressureMode(BackPressureMode backPressureMode) {
return self();
}

@Override
public B backPressureLimiter(BackPressureLimiter backPressureLimiter) {
this.backPressureLimiter = backPressureLimiter;
return self();
}

@Override
public B acknowledgementInterval(Duration acknowledgementInterval) {
Assert.notNull(acknowledgementInterval, "acknowledgementInterval cannot be null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.awspring.cloud.sqs.listener.source.AcknowledgementProcessingMessageSource;
import io.awspring.cloud.sqs.listener.source.MessageSource;
import io.awspring.cloud.sqs.listener.source.PollingMessageSource;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -225,10 +226,18 @@ private TaskExecutor validateCustomExecutor(TaskExecutor taskExecutor) {
}

protected BackPressureHandler createBackPressureHandler() {
return SemaphoreBackPressureHandler.builder().batchSize(getContainerOptions().getMaxMessagesPerPoll())
.totalPermits(getContainerOptions().getMaxConcurrentMessages())
.acquireTimeout(getContainerOptions().getMaxDelayBetweenPolls())
.throughputConfiguration(getContainerOptions().getBackPressureMode()).build();
O containerOptions = getContainerOptions();
List<BackPressureHandler> backPressureHandlers = new ArrayList<>(2);
Duration acquireTimeout = containerOptions.getMaxDelayBetweenPolls();
int batchSize = containerOptions.getMaxMessagesPerPoll();
backPressureHandlers.add(SemaphoreBackPressureHandler.builder().batchSize(batchSize)
.totalPermits(containerOptions.getMaxConcurrentMessages()).acquireTimeout(acquireTimeout)
.throughputConfiguration(containerOptions.getBackPressureMode()).build());
if (containerOptions.getBackPressureLimiter() != null) {
backPressureHandlers.add(new BackPressureHandlerLimiter(containerOptions.getBackPressureLimiter(),
acquireTimeout, containerOptions.getStandbyLimitPollingInterval(), batchSize));
}
return new CompositeBackPressureHandler(backPressureHandlers, batchSize);
}

protected TaskExecutor createSourcesTaskExecutor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
public interface BackPressureHandler {

/**
* Request a number of permits. Each obtained permit allows the
* Requests a number of permits. Each obtained permit allows the
* {@link io.awspring.cloud.sqs.listener.source.MessageSource} to retrieve one message.
* @param amount the amount of permits to request.
* @return the amount of permits obtained.
Expand All @@ -38,11 +38,24 @@ public interface BackPressureHandler {
int request(int amount) throws InterruptedException;

/**
* Release the specified amount of permits. Each message that has been processed should release one permit, whether
* processing was successful or not.
* Releases the specified amount of permits for processed messages. Each message that has been processed should
* release one permit, whether processing was successful or not.
* <p>
* This method can is called in the following use cases:
* <ul>
* <li>{@link ReleaseReason#LIMITED}: permits were not used because another BackPressureHandler has a lower permits
* limit and the difference in permits needs to be returned.</li>
* <li>{@link ReleaseReason#NONE_FETCHED}: none of the permits were actually used because no messages were retrieved
* from SQS. Permits need to be returned.</li>
* <li>{@link ReleaseReason#PARTIAL_FETCH}: some of the permits were used (some messages were retrieved from SQS).
* The unused ones need to be returned. The amount to be returned might be {@literal 0}, in which case it means all
* the permits will be used as the same number of messages were fetched from SQS.</li>
* <li>{@link ReleaseReason#PROCESSED}: a message processing finished, successfully or not.</li>
* </ul>
* @param amount the amount of permits to release.
* @param reason the reason why the permits were released.
*/
void release(int amount);
void release(int amount, ReleaseReason reason);

/**
* Attempts to acquire all permits up to the specified timeout. If successful, means all permits were returned and
Expand All @@ -52,4 +65,24 @@ public interface BackPressureHandler {
*/
boolean drain(Duration timeout);

enum ReleaseReason {
/**
* Permits were not used because another BackPressureHandler has a lower permits limit and the difference need
* to be aligned across all handlers.
*/
LIMITED,
/**
* No messages were retrieved from SQS, so all permits need to be returned.
*/
NONE_FETCHED,
/**
* Some messages were fetched from SQS. Unused permits need to be returned.
*/
PARTIAL_FETCH,
/**
* The processing of one or more messages finished, successfully or not.
*/
PROCESSED;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright 2013-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.awspring.cloud.sqs.listener;

import java.time.Duration;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
* A {@link BatchAwareBackPressureHandler} implementation that uses an internal {@link Semaphore} for adapting the
* maximum number of permits that can be acquired by the {@link #backPressureHandler} based on the downstream
* backpressure limit computed by the {@link #backPressureLimiter}.
*
* @see BackPressureLimiter
*/
public class BackPressureHandlerLimiter implements BatchAwareBackPressureHandler, IdentifiableContainerComponent {

/**
* The {@link BackPressureLimiter} which computes a limit on how many permits can be requested at a given moment.
*/
private final BackPressureLimiter backPressureLimiter;

/**
* The duration to wait for permits to be acquired.
*/
private final Duration acquireTimeout;

/**
* The duration to sleep when the queue processing is in standby.
*/
private final Duration standbyLimitPollingInterval;

/**
* The limit of permits that can be acquired at the current time. The permits limit is defined in the [0,
* Integer.MAX_VALUE] interval. A value of {@literal 0} means that no permits can be acquired.
* <p>
* This value is updated based on the downstream backpressure reported by the {@link #backPressureLimiter}.
*/
private final AtomicInteger permitsLimit = new AtomicInteger(0);

private final ReducibleSemaphore semaphore = new ReducibleSemaphore(0);

private final int batchSize;

private String id;

public BackPressureHandlerLimiter(BackPressureLimiter backPressureLimiter, Duration acquireTimeout,
Duration standbyLimitPollingInterval, int batchSize) {
this.backPressureLimiter = backPressureLimiter;
this.acquireTimeout = acquireTimeout;
this.standbyLimitPollingInterval = standbyLimitPollingInterval;
this.batchSize = batchSize;
}

@Override
public void setId(String id) {
this.id = id;
}

@Override
public String getId() {
return id;
}

@Override
public int requestBatch() throws InterruptedException {
return request(batchSize);
}

@Override
public int request(int amount) throws InterruptedException {
int permits = Math.min(updatePermitsLimit(), amount);
if (permits == 0) {
Thread.sleep(standbyLimitPollingInterval.toMillis());
return 0;
}
if (semaphore.tryAcquire(permits, acquireTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
return permits;
}
return 0;
}

@Override
public void release(int amount, ReleaseReason reason) {
semaphore.release(amount);
}

@Override
public boolean drain(Duration timeout) {
return true;
}

private int updatePermitsLimit() {
return permitsLimit.updateAndGet(oldLimit -> {
int newLimit = Math.max(0, backPressureLimiter.limit());
if (newLimit < oldLimit) {
int blockedPermits = oldLimit - newLimit;
semaphore.reducePermits(blockedPermits);
}
else if (newLimit > oldLimit) {
int releasedPermits = newLimit - oldLimit;
semaphore.release(releasedPermits);
}
return newLimit;
});
}

private static class ReducibleSemaphore extends Semaphore {

ReducibleSemaphore(int permits) {
super(permits);
}

@Override
public void reducePermits(int reduction) {
super.reducePermits(reduction);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2013-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.awspring.cloud.sqs.listener;

/**
* The BackPressureLimiter enables a dynamic reduction of the queues consumption capacity depending on external factors.
*/
public interface BackPressureLimiter {

/**
* {@return the limit to be applied to the queue consumption.}
*
* The limit can be used to reduce the queue consumption capabilities of the next polling attempts. The container
* will work toward satisfying the limit by decreasing the maximum number of concurrent messages that can ve
* processed.
*
* The following values will have the following effects:
*
* <ul>
* <li>zero or negative limits will stop consumption from the queue. When such a situation occurs, the queue
* processing is said to be on "standby".</li>
* <li>Values >= 1 and < {@link ContainerOptions#getMaxConcurrentMessages()} will reduce the queue consumption
* capabilities of the next polling attempts.</li>
* <li>Values >= {@link ContainerOptions#getMaxConcurrentMessages()} will not reduce the queue consumption
* capabilities</li>
* </ul>
*
* Note: the adjustment will require a few polling cycles to be in effect.
*/
int limit();
}
Loading
Loading