Skip to content

Commit

Permalink
Introduce a CompositeBackPressureHandler allowing for composition of …
Browse files Browse the repository at this point in the history
…BackPressureHandlers (awspring#1251)
  • Loading branch information
loicrouchon committed Jan 7, 2025
1 parent 93cb447 commit 432d490
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.awspring.cloud.sqs.listener.source.AcknowledgementProcessingMessageSource;
import io.awspring.cloud.sqs.listener.source.MessageSource;
import io.awspring.cloud.sqs.listener.source.PollingMessageSource;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -226,17 +227,17 @@ private TaskExecutor validateCustomExecutor(TaskExecutor taskExecutor) {

protected BackPressureHandler createBackPressureHandler() {
O containerOptions = getContainerOptions();
BatchAwareBackPressureHandler backPressureHandler = SemaphoreBackPressureHandler.builder()
.batchSize(containerOptions.getMaxMessagesPerPoll())
.totalPermits(containerOptions.getMaxConcurrentMessages())
.acquireTimeout(containerOptions.getMaxDelayBetweenPolls())
.throughputConfiguration(containerOptions.getBackPressureMode()).build();
List<BackPressureHandler> backPressureHandlers = new ArrayList<>(2);
Duration acquireTimeout = containerOptions.getMaxDelayBetweenPolls();
int batchSize = containerOptions.getMaxMessagesPerPoll();
backPressureHandlers.add(SemaphoreBackPressureHandler.builder().batchSize(batchSize)
.totalPermits(containerOptions.getMaxConcurrentMessages()).acquireTimeout(acquireTimeout)
.throughputConfiguration(containerOptions.getBackPressureMode()).build());
if (containerOptions.getBackPressureLimiter() != null) {
backPressureHandler = new BackPressureHandlerLimiter(backPressureHandler,
containerOptions.getBackPressureLimiter(), containerOptions.getStandbyLimitPollingInterval(),
containerOptions.getMaxDelayBetweenPolls());
backPressureHandlers.add(new BackPressureHandlerLimiter(containerOptions.getBackPressureLimiter(),
acquireTimeout, containerOptions.getStandbyLimitPollingInterval(), batchSize));
}
return backPressureHandler;
return new CompositeBackPressureHandler(backPressureHandlers, batchSize);
}

protected TaskExecutor createSourcesTaskExecutor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
public interface BackPressureHandler {

/**
* Request a number of permits. Each obtained permit allows the
* Requests a number of permits. Each obtained permit allows the
* {@link io.awspring.cloud.sqs.listener.source.MessageSource} to retrieve one message.
* @param amount the amount of permits to request.
* @return the amount of permits obtained.
Expand All @@ -38,11 +38,24 @@ public interface BackPressureHandler {
int request(int amount) throws InterruptedException;

/**
* Release the specified amount of permits. Each message that has been processed should release one permit, whether
* processing was successful or not.
* Releases the specified amount of permits for processed messages. Each message that has been processed should
* release one permit, whether processing was successful or not.
* <p>
* This method can is called in the following use cases:
* <ul>
* <li>{@link ReleaseReason#LIMITED}: permits were not used because another BackPressureHandler has a lower permits
* limit and the difference in permits needs to be returned.</li>
* <li>{@link ReleaseReason#NONE_FETCHED}: none of the permits were actually used because no messages were retrieved
* from SQS. Permits need to be returned.</li>
* <li>{@link ReleaseReason#PARTIAL_FETCH}: some of the permits were used (some messages were retrieved from SQS).
* The unused ones need to be returned. The amount to be returned might be {@literal 0}, in which case it means all
* the permits will be used as the same number of messages were fetched from SQS.</li>
* <li>{@link ReleaseReason#PROCESSED}: a message processing finished, successfully or not.</li>
* </ul>
* @param amount the amount of permits to release.
* @param reason the reason why the permits were released.
*/
void release(int amount);
void release(int amount, ReleaseReason reason);

/**
* Attempts to acquire all permits up to the specified timeout. If successful, means all permits were returned and
Expand All @@ -52,4 +65,24 @@ public interface BackPressureHandler {
*/
boolean drain(Duration timeout);

enum ReleaseReason {
/**
* Permits were not used because another BackPressureHandler has a lower permits limit and the difference need
* to be aligned across all handlers.
*/
LIMITED,
/**
* No messages were retrieved from SQS, so all permits need to be returned.
*/
NONE_FETCHED,
/**
* Some messages were fetched from SQS. Unused permits need to be returned.
*/
PARTIAL_FETCH,
/**
* The processing of one or more messages finished, successfully or not.
*/
PROCESSED;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,7 @@
*
* @see BackPressureLimiter
*/
public class BackPressureHandlerLimiter implements BatchAwareBackPressureHandler {

/**
* The {@link BatchAwareBackPressureHandler} which permits should be limited by the {@link #backPressureLimiter}.
*/
private final BatchAwareBackPressureHandler backPressureHandler;
public class BackPressureHandlerLimiter implements BatchAwareBackPressureHandler, IdentifiableContainerComponent {

/**
* The {@link BackPressureLimiter} which computes a limit on how many permits can be requested at a given moment.
Expand All @@ -59,50 +54,54 @@ public class BackPressureHandlerLimiter implements BatchAwareBackPressureHandler

private final ReducibleSemaphore semaphore = new ReducibleSemaphore(0);

public BackPressureHandlerLimiter(BatchAwareBackPressureHandler backPressureHandler,
BackPressureLimiter backPressureLimiter, Duration standbyLimitPollingInterval, Duration acquireTimeout) {
this.backPressureHandler = backPressureHandler;
private final int batchSize;

private String id;

public BackPressureHandlerLimiter(BackPressureLimiter backPressureLimiter, Duration acquireTimeout,
Duration standbyLimitPollingInterval, int batchSize) {
this.backPressureLimiter = backPressureLimiter;
this.acquireTimeout = acquireTimeout;
this.standbyLimitPollingInterval = standbyLimitPollingInterval;
this.batchSize = batchSize;
}

@Override
public int requestBatch() throws InterruptedException {
int permits = updatePermitsLimit();
int batchSize = getBatchSize();
if (permits < batchSize) {
return acquirePermits(permits, backPressureHandler::request);
}
return acquirePermits(batchSize, p -> backPressureHandler.requestBatch());
public void setId(String id) {
this.id = id;
}

@Override
public void releaseBatch() {
semaphore.release(getBatchSize());
backPressureHandler.releaseBatch();
public String getId() {
return id;
}

@Override
public int getBatchSize() {
return backPressureHandler.getBatchSize();
public int requestBatch() throws InterruptedException {
return request(batchSize);
}

@Override
public int request(int amount) throws InterruptedException {
int permits = Math.min(updatePermitsLimit(), amount);
return acquirePermits(permits, backPressureHandler::request);
if (permits == 0) {
Thread.sleep(standbyLimitPollingInterval.toMillis());
return 0;
}
if (semaphore.tryAcquire(permits, acquireTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
return permits;
}
return 0;
}

@Override
public void release(int amount) {
public void release(int amount, ReleaseReason reason) {
semaphore.release(amount);
backPressureHandler.release(amount);
}

@Override
public boolean drain(Duration timeout) {
return backPressureHandler.drain(timeout);
return true;
}

private int updatePermitsLimit() {
Expand All @@ -120,25 +119,6 @@ else if (newLimit > oldLimit) {
});
}

private interface PermitsRequester {
int request(int amount) throws InterruptedException;
}

private int acquirePermits(int amount, PermitsRequester permitsRequester) throws InterruptedException {
if (amount == 0) {
Thread.sleep(standbyLimitPollingInterval.toMillis());
return 0;
}
if (semaphore.tryAcquire(amount, acquireTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
int obtained = permitsRequester.request(amount);
if (obtained < amount) {
semaphore.release(amount - obtained);
}
return obtained;
}
return 0;
}

private static class ReducibleSemaphore extends Semaphore {

ReducibleSemaphore(int permits) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,4 @@ public interface BatchAwareBackPressureHandler extends BackPressureHandler {
* @throws InterruptedException if the Thread is interrupted while waiting for permits.
*/
int requestBatch() throws InterruptedException;

/**
* Release a batch of permits. This has the semantics of letting the {@link BackPressureHandler} know that all
* permits from a batch are being released, in opposition to {@link #release(int)} in which any number of permits
* can be specified.
*/
void releaseBatch();

/**
* Return the configured batch size for this handler.
* @return the batch size.
*/
int getBatchSize();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2013-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.awspring.cloud.sqs.listener;

import java.time.Duration;
import java.util.List;

public class CompositeBackPressureHandler implements BatchAwareBackPressureHandler, IdentifiableContainerComponent {

private final List<BackPressureHandler> backPressureHandlers;

private final int batchSize;

private String id;

public CompositeBackPressureHandler(List<BackPressureHandler> backPressureHandlers, int batchSize) {
this.backPressureHandlers = backPressureHandlers;
this.batchSize = batchSize;
}

@Override
public void setId(String id) {
this.id = id;
backPressureHandlers.stream().filter(IdentifiableContainerComponent.class::isInstance)
.map(IdentifiableContainerComponent.class::cast)
.forEach(bph -> bph.setId(bph.getClass().getSimpleName() + "-" + id));
}

@Override
public String getId() {
return id;
}

@Override
public int requestBatch() throws InterruptedException {
return request(batchSize);
}

@Override
public int request(int amount) throws InterruptedException {
int obtained = amount;
int[] obtainedPerBph = new int[backPressureHandlers.size()];
for (int i = 0; i < backPressureHandlers.size() && obtained > 0; i++) {
obtainedPerBph[i] = backPressureHandlers.get(i).request(obtained);
obtained = Math.min(obtained, obtainedPerBph[i]);
}
for (int i = 0; i < backPressureHandlers.size(); i++) {
int obtainedForBph = obtainedPerBph[i];
if (obtainedForBph > obtained) {
backPressureHandlers.get(i).release(obtainedForBph - obtained, ReleaseReason.LIMITED);
}
}
return obtained;
}

@Override
public void release(int amount, ReleaseReason reason) {
for (BackPressureHandler handler : backPressureHandlers) {
handler.release(amount, reason);
}
}

@Override
public boolean drain(Duration timeout) {
boolean result = true;
for (BackPressureHandler handler : backPressureHandlers) {
result &= !handler.drain(timeout);
}
return result;
}
}
Loading

0 comments on commit 432d490

Please sign in to comment.