Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
loicrouchon committed Jan 3, 2025
1 parent 93cb447 commit 51bad1a
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.awspring.cloud.sqs.listener.source.AcknowledgementProcessingMessageSource;
import io.awspring.cloud.sqs.listener.source.MessageSource;
import io.awspring.cloud.sqs.listener.source.PollingMessageSource;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -226,17 +227,17 @@ private TaskExecutor validateCustomExecutor(TaskExecutor taskExecutor) {

protected BackPressureHandler createBackPressureHandler() {
O containerOptions = getContainerOptions();
BatchAwareBackPressureHandler backPressureHandler = SemaphoreBackPressureHandler.builder()
.batchSize(containerOptions.getMaxMessagesPerPoll())
.totalPermits(containerOptions.getMaxConcurrentMessages())
.acquireTimeout(containerOptions.getMaxDelayBetweenPolls())
.throughputConfiguration(containerOptions.getBackPressureMode()).build();
List<BackPressureHandler> backPressureHandlers = new ArrayList<>(2);
Duration acquireTimeout = containerOptions.getMaxDelayBetweenPolls();
int batchSize = containerOptions.getMaxMessagesPerPoll();
backPressureHandlers.add(SemaphoreBackPressureHandler.builder().batchSize(batchSize)
.totalPermits(containerOptions.getMaxConcurrentMessages()).acquireTimeout(acquireTimeout)
.throughputConfiguration(containerOptions.getBackPressureMode()).build());
if (containerOptions.getBackPressureLimiter() != null) {
backPressureHandler = new BackPressureHandlerLimiter(backPressureHandler,
containerOptions.getBackPressureLimiter(), containerOptions.getStandbyLimitPollingInterval(),
containerOptions.getMaxDelayBetweenPolls());
backPressureHandlers.add(new BackPressureHandlerLimiter(containerOptions.getBackPressureLimiter(),
acquireTimeout, containerOptions.getStandbyLimitPollingInterval()));
}
return backPressureHandler;
return new CompositeBackPressureHandler(backPressureHandlers, batchSize);
}

protected TaskExecutor createSourcesTaskExecutor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,7 @@
*
* @see BackPressureLimiter
*/
public class BackPressureHandlerLimiter implements BatchAwareBackPressureHandler {

/**
* The {@link BatchAwareBackPressureHandler} which permits should be limited by the {@link #backPressureLimiter}.
*/
private final BatchAwareBackPressureHandler backPressureHandler;
public class BackPressureHandlerLimiter implements BackPressureHandler {

/**
* The {@link BackPressureLimiter} which computes a limit on how many permits can be requested at a given moment.
Expand All @@ -59,50 +54,34 @@ public class BackPressureHandlerLimiter implements BatchAwareBackPressureHandler

private final ReducibleSemaphore semaphore = new ReducibleSemaphore(0);

public BackPressureHandlerLimiter(BatchAwareBackPressureHandler backPressureHandler,
BackPressureLimiter backPressureLimiter, Duration standbyLimitPollingInterval, Duration acquireTimeout) {
this.backPressureHandler = backPressureHandler;
public BackPressureHandlerLimiter(BackPressureLimiter backPressureLimiter, Duration acquireTimeout,
Duration standbyLimitPollingInterval) {
this.backPressureLimiter = backPressureLimiter;
this.acquireTimeout = acquireTimeout;
this.standbyLimitPollingInterval = standbyLimitPollingInterval;
}

@Override
public int requestBatch() throws InterruptedException {
int permits = updatePermitsLimit();
int batchSize = getBatchSize();
if (permits < batchSize) {
return acquirePermits(permits, backPressureHandler::request);
}
return acquirePermits(batchSize, p -> backPressureHandler.requestBatch());
}

@Override
public void releaseBatch() {
semaphore.release(getBatchSize());
backPressureHandler.releaseBatch();
}

@Override
public int getBatchSize() {
return backPressureHandler.getBatchSize();
}

@Override
public int request(int amount) throws InterruptedException {
int permits = Math.min(updatePermitsLimit(), amount);
return acquirePermits(permits, backPressureHandler::request);
if (permits == 0) {
Thread.sleep(standbyLimitPollingInterval.toMillis());
return 0;
}
if (semaphore.tryAcquire(permits, acquireTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
return permits;
}
return 0;
}

@Override
public void release(int amount) {
semaphore.release(amount);
backPressureHandler.release(amount);
}

@Override
public boolean drain(Duration timeout) {
return backPressureHandler.drain(timeout);
return true;
}

private int updatePermitsLimit() {
Expand All @@ -120,25 +99,6 @@ else if (newLimit > oldLimit) {
});
}

private interface PermitsRequester {
int request(int amount) throws InterruptedException;
}

private int acquirePermits(int amount, PermitsRequester permitsRequester) throws InterruptedException {
if (amount == 0) {
Thread.sleep(standbyLimitPollingInterval.toMillis());
return 0;
}
if (semaphore.tryAcquire(amount, acquireTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
int obtained = permitsRequester.request(amount);
if (obtained < amount) {
semaphore.release(amount - obtained);
}
return obtained;
}
return 0;
}

private static class ReducibleSemaphore extends Semaphore {

ReducibleSemaphore(int permits) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2013-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.awspring.cloud.sqs.listener;

import java.time.Duration;
import java.util.List;

public class CompositeBackPressureHandler implements BatchAwareBackPressureHandler, IdentifiableContainerComponent {

private final List<BackPressureHandler> backPressureHandlers;

private final int batchSize;
private String id;

public CompositeBackPressureHandler(List<BackPressureHandler> backPressureHandlers, int batchSize) {
this.backPressureHandlers = backPressureHandlers;
this.batchSize = batchSize;
}

@Override
public void setId(String id) {
this.id = id;
}

@Override
public String getId() {
return id;
}

@Override
public int requestBatch() throws InterruptedException {
return request(batchSize);
}

@Override
public void releaseBatch() {
release(batchSize);
}

@Override
public int getBatchSize() {
return batchSize;
}

@Override
public int request(int amount) throws InterruptedException {
int obtained = amount;
int[] obtainedPerBph = new int[backPressureHandlers.size()];
for (int i = 0; i < backPressureHandlers.size() && obtained > 0; i++) {
obtainedPerBph[i] = backPressureHandlers.get(i).request(obtained);
obtained = Math.min(obtained, obtainedPerBph[i]);
}
for (int i = 0; i < backPressureHandlers.size(); i++) {
int obtainedForBph = obtainedPerBph[i];
if (obtainedForBph > obtained) {
if (amount == batchSize) {
backPressureHandlers.get(i).release(amount);
// FIXME what if we cannot acquire 'obtained' (< 'amount') permits?
backPressureHandlers.get(i).request(obtained);
}
else {
backPressureHandlers.get(i).release(obtainedForBph - obtained);
}
}
}
return obtained;
}

@Override
public void release(int amount) {
for (BackPressureHandler handler : backPressureHandlers) {
handler.release(amount);
}
}

@Override
public boolean drain(Duration timeout) {
boolean result = true;
for (BackPressureHandler handler : backPressureHandlers) {
result &= !handler.drain(timeout);
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ public String getId() {

@Override
public int request(int amount) throws InterruptedException {
if (amount == batchSize) {
return requestBatch();
}
return tryAcquire(amount, this.currentThroughputMode) ? amount : 0;
}

Expand Down Expand Up @@ -171,6 +174,10 @@ private void maybeSwitchToLowThroughputMode() {

@Override
public void release(int amount) {
if (amount == batchSize) {
releaseBatch();
return;
}
logger.trace("Releasing {} permits for {}. Permits left: {}", amount, this.id,
this.semaphore.availablePermits());
maybeSwitchToHighThroughputMode(amount);
Expand Down

0 comments on commit 51bad1a

Please sign in to comment.