Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamically configure SemaphoreBackPressureHandler with BackPressureLimiter (#1251) #1308

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

loicrouchon
Copy link

📢 Type of change

  • Bugfix
  • New feature
  • Enhancement
  • Refactoring

📜 Description

This change enhances the SemaphoreBackPressureHandler with the support of a new BackPressureLimiter interface.

This BackPressureLimiter interface is to be implemented by applications. It has a single method int limit() that returns a number of permits that can be consumed by the SemaphoreBackPressureHandler.

Before each polling, the limit will be checked by the SemaphoreBackPressureHandler and adjust the number of permits that can be requested (in the range [0, totalPermits]). The limit returned by the BackPressureLimiter#limit() method is to be understood in terms of number of messages that can be consumed from the queue at the current instant. If 0 (or less) the queue consumption is considered to be on standby.

When a polling is performed and the consumption is on standby, the SemaphoreBackPressureHandler will sleep the standbyLimitPollingInterval before allowing for a next polling attempt (we cannot rely on the semaphore acquire timeouts here, hence the need for standbyLimitPollingInterval.

Both the BackPressureLimiter and standbyLimitPollingInterval can be configured via the ContainerOptions

💡 Motivation and Context

The goal of this change is to address #1251.

#1251 aims to provide a more general solution to issues like #481 by giving control to users on how they would like to dynamically limit messages consumption from an SQS queue. Typical use cases could be rate-limiters (like #481) or more complicated setups involving measuring the load of a downstream system and adjusting or stopping the messages consumption

💚 How did you test it?

The testing was so far only tested via integration tests which tested various scenarios:

  • static limit being 0, totalPermits or more, and various values in between.
  • Dynamic limit changing, and checking the queue processing adjusting itself (in a "synchronized" way)
  • Dynamic limit changing, and checking the queue processing adjusting itself (not in a synchronized way, so more realistic of what will happen, but harder to make strict assumptions on the moving target)

📝 Checklist

  • I reviewed submitted code
  • I added tests to verify changes
  • I updated reference documentation to reflect the change
  • All tests passing
  • No breaking changes

🔮 Next steps

  • Collecting feedback on the approach
  • Work on updating the reference documentation to mention the new capability.

@github-actions github-actions bot added the component: sqs SQS integration related issue label Dec 20, 2024
@jeroenvandevelde
Copy link

@loicrouchon If i read this correctly and i would need to build #481 with for example process 5 messages every second.

I would need to implement the BackPressureLimiter, with a logic to set/increase the limit to/with 5 for every second?
As every message received would substract one of the BackPressureLimiter.

@loicrouchon
Copy link
Author

@jeroenvandevelde, the goal of the BackPressureLimiter is to provide an information on the maximum number of messages that can be read from the queue in the next polling cycle. This is done by calling the BackPressureLimiter#limit() before trying to acquire permits to poll messages from the queue.

However, it does not consider how many messages are currently being processed by the queue consumer, nor does it check again the current BackPressureLimiter#limit() after obtaining permits if this number should be reduced.

So what you would need to do to implement a rate-limiter is a 2 step approach.

  1. rate-limit implementation: In the queue consumer, you would need to protect sending messages to the rate-limited API using to effectively wait before sending an API call if you're over the limit. This will ensure the hard constraint on the rate limit is being applied.
  2. Making the queue rate-limit friendly: Configure the SQS container with a BackPressureLimiter#limit() that reads the current API rate and translates it into a limit. This will allow the queue consumption to avoid polling messages which would have to wait and therefore potentially exceed the the visibility timeout.

In case the rate-limit is not a hard limit (i.e. meaning you can temporarily have short bursts over the limit), then 1. is not necessary.

So rate-limiting can somehow be performed, but it requires extra care if there are hard constraints regarding the rate limit. The solution implemented in this PR primarily aims at use-cases where the pressure is measured from an external system and in which case there is by design a delay between the measure of the pressure on the downstream system and reacting to it.

A good example would be when pushing messages to another system (could be another queue for example). In this case, the BackPressureLimiter#limit() could be built in such a way it would measure the pressure on the system and map the pressure level to a certain limit.

For example, in the case the downstream system is an SQS queue:

  • If the number of visible messages < 10, then limit = total permits (full speed)
  • If the number of visible messages < 100, then limit = 50% of total permits (reduced speed)
  • If the number of visible messages < 100, then limit = 0 (standby)

PS: the advantage of the example pacing messages publication to a downstream queue might not be obvious, but there are two:

  • The regular resources consumption one: Avoids consuming cluster resources (CPU, memory) and other systems' resources involved into the process of publishing that message so that critical systems have more bandwidth to catch-up.
  • Allow different publishers to publish messages with different priorities. One publisher might be low priority and should stop publishing messages if there is a high load, while another might have a higher priority and should not stop yet

@jeroenvandevelde
Copy link

jeroenvandevelde commented Dec 21, 2024

@loicrouchon I missed the fact than that if it reads a message it doesn't subtract it from the limit.

In my case where i would like to have a max rate of x messages / second, your approach seems a bit weird as all the information for this is available in the library.

How fast are we processing and the rate at which we would like to go (configurable value).
So therefore i would prefer to not push the complexity of keeping track of how many messages we are retrieving and change the limit based on that to the user of the library.

I can follow if it is based on parameters outside of the library, which your case sounds like.
I will try to find some time in the coming days to look if i can find an alternative way to do this.

@loicrouchon
Copy link
Author

How fast are we processing and the rate at which we would like to go (configurable value).
So therefore i would prefer to not push the complexity of keeping track of how many messages we are retrieving and change the limit based on that to the user of the library.

@jeroenvandevelde I think this information is only fully available in a very limited number of use cases. Most of the queue consumer implementations I had to implement in the past were doing a variable number of API calls. It usually depended on the content of the message being consumed, some persistent state, or the result of previous API calls.

So limiting consumption from the queue at a fix rate-per-second is not accounting for cases where the rate-limited API we're trying to protect would be called a variable number of times (0 times or more). That is why in the solution presented here the BackPressureLimiter#limit() performs a measure of the pressure and then translates into how many messages can be consumed at that point in time. It does not do a computation of many it allowed/how many it can still allow.

I totally agree with you, it is more complicated than the use case you described. But it would avoid the issue of blocking the queue consumption for messages that did not triggered the API call that should be rate-limited.

I will try to find some time in the coming days to look if i can find an alternative way to do this.

Please do so, and let me know about your findings.

@jeroenvandevelde
Copy link

jeroenvandevelde commented Dec 21, 2024

I think this information is only fully available in a very limited number of use cases. Most of the queue consumer implementations I had to implement in the past were doing a variable number of API calls. It usually depended on the content of the message being consumed, some persistent state, or the result of previous API calls.

If these situations are very limited or are a normal use case will depend a lot on your context.
Maybe both situations require separate solutions with their own amount of complexity.
I totally understand your solution gives room for more fine-grained rate limiting as you can increase/decrease based on the input from other systems/...

We have designed our system, so that every HTTP call to an (external) endpoint has a queue in front to handle the overflow.
We do a kind of intake with both internal and external apis whereafter we get an x amount of requests / seconds, especially in external apis it is quite common that you pay/get x requests / seconds.

This indeed doesn't cover the situation where system a is not going at full speed and therefore system b could go faster against the same endpoint.

@tomazfernandes
Copy link
Contributor

Hi @loicrouchon and @jeroenvandevelde.

@loicrouchon, overall this looks very promising - thanks!

I wonder why you chose to change the current SemaphoreBackPressureHandler implementation though - it seems like the functionalities are complementary to each other? Would it make sense to have this logic in a separate class, maybe a wrapper as you suggested?

Also, would it make sense maybe to have a list of BackPressureHandlers and have each return a number of permits they'll allow for that poll or something similar?

It also seems like the changes might fix #1187, which is an outstanding issue, so that's great.

Sorry I couldn't discuss this earlier with you in the issue, and thanks again to you both for bringing these enhancements up!

@loicrouchon
Copy link
Author

Hi @tomazfernandes and Happy New Year!

I wonder why you chose to change the current SemaphoreBackPressureHandler implementation though - it seems like the functionalities are complementary to each other? Would it make sense to have this logic in a separate class, maybe a wrapper as you suggested?

I thought about doing so, but gave up because of the release method that was somehow complicated to wrap. But that was at the beginning of my attempts. I now have a much better understanding of the SemaphoreBackPressureHandler and I can try to revisit that aspect as I agree with you it would be better to have a composable approach to keep complexity minimal in each component.

I'll keep you posted about my progress.

@loicrouchon
Copy link
Author

@tomazfernandes I pushed a version using a wrapper over the SemaphoreBackPressureHandler. I'm quite happy with how it simplify things now and I'm looking for feedback before continuing with the PR (maybe renaming a few things for improving clarity and updating reference documentation)

Also, would it make sense maybe to have a list of BackPressureHandlers and have each return a number of permits they'll allow for that poll or something similar?

I'm not sure, I think it can get tricky very quickly when it comes to the requestBatch/releaseBatch/getBatchSize methods.

However, if you would like to limit the number of permits via different BackPressureLimiter, nothing prevents from creating another BackPressureLimiter which implementation calls the other ones and returns the min value. Would that do the trick you were mentioning?

It also seems like the changes might fix #1187, which is an outstanding issue, so that's great.

Regarding this, I'm not 100% sure it would fix it. So I would need to look more into it.

@tomazfernandes
Copy link
Contributor

tomazfernandes commented Jan 2, 2025

Thanks for the update @loicrouchon, I think we're definitely moving in the right direction here.

Also, would it make sense maybe to have a list of BackPressureHandlers and have each return a number of permits they'll allow for that poll or something similar?

I'm not sure, I think it can get tricky very quickly when it comes to the requestBatch/releaseBatch/getBatchSize methods.

I'll share what's in my mind for your consideration - I think it's similar to what you have but with a slightly different approach that might simplify things a bit while bringing more flexibility. I might be missing something though.

I think each BackPressureHandler implementation accounts for a specific capacity: SemaphoreBPH accounts for the app's capacity, RateLimitingBPH could account for a downstream API capacity, and we could have another BPH implementation for checking a downstream queue capacity.

So, we could have a CompositeBackPressureHandler implementation of BatchAwareBPH that would contain a list of BPHs that it would iterate on per poll.

The batch methods themselves from BatchAwareBPH are really just a way to request and release a predictable amount of permits each time, and the CompositeBPH could have the batch size information.

On a requestBatch or request calls, CBPH would delegate to the inner BPHs, either calling requestBatch directly or request(batchSize) if the instance is not BatchAware. We'd keep track of the amount of permits each BPH returns and pick the smaller number.

We'd then call release on the difference for each BPH and finally return the chosen number of permits.

The benefit I see for this approach is that we can keep each BPH implementation with its own independent logic and reacting to request and release calls independently, while also not introducing a new interface which would also add complexity.

We could also in the future separate Low / High Throughput logic to its own BPH to better manage complexity, and reduce SemaphoreBPH which TBH would make me very happy.

Example Scenario

I'll illustrate with an example just as a sanity check:

Let's say we have 3 BPH implementations in the CompositeBPH:

  • SemaphoreBPH has 50 permits representing the apps maximum capacity
  • RateLimiterBPH has 20 permits / second representing a downstream API capacity
  • DownstreamQueueBPH has a variable number of permits depending on a downstream queue
  • Our batch size will be 20 for this example.

On the first poll, SBPH will return 20 permits, RLBPH will return 20 permits, DSQBPH let's say will return 20 permits. CBPH will then return 20 permits.

On the second poll, SBPH will return 20 permits, RLBPH will return 0 permits and DQBPH will return let's say 20 permits again. CBPH will call release with 20 permits on SBPH, with 0 permits on RLBPH and with 20 on DQBPH, and return 0 permits. We could of course short-circuit when the number of returned permits is 0 for a SBPH since that means no permits will be returned anyway.

Let's say at some point the downstream API is holding requests with a 10 second poorly configured timeout. The RateLimiterBPH would keep allowing 20 permits / second, but the SemaphoreBPH would cap consumption at 50 concurrent messages, avoiding having 200 simultaneous requests which might break both the app and the downstream API.

As the consumers release permits, CompositeBPH would delegate release calls accordingly to all implementations, some of which would likely be a no-ops such as in the RateLimiterBPH.

I don't think this is too different from what you are proposing, in that we're effectively limiting the amount of permits returned, and the logic for CompositeBPH might even be similar to the one you have in the BackPressureHandlerLimiter, but I think it simplifies things a bit by using only the two existing interfaces (BPH and BABPH), and adds more flexibility for the user to just add any number of BPH implementations that operate independently from each other.

Of course, there's a lot of complexity involved and I might be missing something.

Please let me know your thoughts.

Thanks.

@jeroenvandevelde
Copy link

jeroenvandevelde commented Jan 3, 2025

@tomazfernandes

I agree with your reasoning, only two points of feedback.

  1. Your suggestion and the current implementation differ from the fact that currently the DownstreamQueueBPH uses a percentage of the permits and not an actual an amount of permits. This means that we will push some complexity to the downstream service to keep track of the "normal" amount of permits of each service.

    For me this extra complexity seems acceptable as it gives us the option to have one abstraction/interface to cover all the BPHs.

  2. If i read your proposal correctly you suggest to use the request and release methods in the interface of BPH.
    However i want to point out a complexity if we go down that route.

    If the first BPH says 20 permits, but the second BPH says only 5.
    What will happen then? Call release 15 time for the first BPH.
    This is possible but then your BPH needs to support this, i don't think most RateLimiterBPH will support this as you already pointed out.

    It might be better to introduce a method getAvailablePermits on the interface.
    Which the orchestrator of the BPH will call and request the lowest amount it retrieves from all the BPH.

    We would still need to cover the case where getAvailablePermits returns 20 but when request is called you only can retrieve 5. As this situation should happen way less, it might be okay to still go for 20 items.
    If we would still want to go with 5, we could call 15 times release and hope the BPH supports it.

@loicrouchon
Copy link
Author

@tomazfernandes , this is a way bigger scope than I initially intended, but I do see the value for it. I'll work on something along those lines.

If the first BPH says 20 permits, but the second BPH says only 5.
What will happen then? Call release 15 time for the first BPH.
This is possible but then your BPH needs to support this, i don't think most RateLimiterBPH will support this as you already pointed out.

I think this is not a problem (there is a release(int amount) method on the BPH interface. Some implementations would implement it (like the SemaphoreBPH) some other would not implement it at all like the DownstreamQueueBPH (because this one measures an effect that already happened on the system). But the RateLimiterBPH could implement it. As it should know how many request were allowed to go through in the current period and if a lower number of permits was consumed instead of the full one, then it need to be notified of those.

For example, let's say the RateLimiterBPH says 20, but the SemaphoreBPH says 5. Few moments later, new round (but still in the same rate-limiting time window), the RateLimiterBPH should say 15 and not 0 as only 5 of the 20 were used. Hence I believe for the RateLimiterBPH, the release method should be implemented.

It might be better to introduce a method getAvailablePermits on the interface.
Which the orchestrator of the BPH will call and request the lowest amount it retrieves from all the BPH.

I considered this approach. This could in theory allow to remove all semaphore logic from the BPH and only keep it in the composite one. But what I struggle with is the current high/low throughput mode of the SemaphoreBPH as this one has a logic in high throughput, to acquire all available permits in case it timed out when trying to acquire the full amount requested. On top of that, the SemaphoreBPH and RateLimiterBPH would still need a kind a release method to keep track of how many permits were "not consumed" and are still potentially usable for the next round.

With that being said, I think at the current moment implementing a composite BPH containing individual BPH is the best approach. The main downside I see is that it might for wait too long in case it waits for more permits than necessary due to other BPH reducing this number. But to fix this, I can only think of doing a 2 step approach. First figure out how much can be requested, then call the request methods with the lowest number. Not sure we want to do that.

@tomazfernandes I have a question? If we introduce the CompositeBPH, do we need to keep the BatchAwareBackPressureHandler interface? I see it is only used in the AbstractPollingMessageSource, so we could implement this batching logic at the CompositeBPH level and which would translate to request(amount)/release(amount) to all other BPH. This could simplify their implementation, no?

@loicrouchon
Copy link
Author

More questions around the BatchAwareBackPressureHandler.

The behavior of the SemaphoreBPH#requestBatch method is not equivalent to calling request(batchSize). This means in order to keep the current behavior of the SemaphoreBPH when wrapped in the CompositeBPH, the CompositeBPH need to map requestBatch calls to each contained BPH requestBatch calls.

But if one of the BPH limits to something smaller than the batch size, then we have a problem. There are two cases:

  1. The first BPH#requestBatch returns less than the batchSize. In this case, we can downgrade calls on the next BPH to request(amount) which might be fine (there might still be a problem with the release method though).
  2. The first BPH#requestBatch succeeds (returns the batchSize), but one of the following BPH does not. Here is the problem as we should return release unused permits, but given they were acquired using the requestBatch, we should use the releaseBatch method which is not symmetric with the release(batchSize). So in this case, we should do a releaseBatch followed by a request(amount) with the reduced amount, but that sounds terrible.

We could maybe fix this by requiring that request/releaseBatch should be equivalent with request/release(batchSize). This can be done for individual BPH applying the batch behavior in request(amount)/release(amount) methods if the amount is equal to the batchSize.

@tomazfernandes
Copy link
Contributor

tomazfernandes commented Jan 3, 2025

Hey @loicrouchon and @jeroenvandevelde, lots of interesting points.

I have a question? If we introduce the CompositeBPH, do we need to keep the BatchAwareBackPressureHandler interface?

I think we still need the interface. What I'm thinking is - we can implement BPH or BABPH interfaces. The CompositeBPH should check which interface the instance implements and call the appropriate method.

What this means is - if knowing whether we're calling the batch or a smaller set is relevant to the implementation, we should use the BABPH. Otherwise, we can use the simple BPH.

We might look into adding a setBatchSize method to BABPH so we can automate setting batch size to relevant implementations, but perhaps we can start without it.

This means in order to keep the current behavior of the SemaphoreBPH when wrapped in the CompositeBPH, the CompositeBPH need to map requestBatch calls to each contained BPH requestBatch calls.

Yup, that's what I'm thinking.

Here is the problem as we should return release unused permits, but given they were acquired using the requestBatch, we should use the releaseBatch method which is not symmetric with the release(batchSize).

Yeah, there's a lot of subtleties in this logic. Perhaps we could do something similar to what's done in AbstractPollingMessageSource where we check the amount of permits to decide which release method to call?

public Collection<Message<T>> releaseUnusedPermits(int permits, Collection<Message<T>> msgs) {
if (msgs.isEmpty() && permits == this.backPressureHandler.getBatchSize()) {
this.backPressureHandler.releaseBatch();
logger.trace("Released batch of unused permits for queue {}", this.pollingEndpointName);
}
else {
int permitsToRelease = permits - msgs.size();
this.backPressureHandler.release(permitsToRelease);
logger.trace("Released {} unused permits for queue {}", permitsToRelease, this.pollingEndpointName);
}
return msgs;
}

The first BPH#requestBatch returns less than the batchSize. In this case, we can downgrade calls on the next BPH to request(amount) which might be fine (there might still be a problem with the release method though).

Yeah, I thought of this, but I'm not sure how much more complexity this would introduce. Perhaps always requesting a batch would be simpler.

It might be better to introduce a method getAvailablePermits on the interface.
Which the orchestrator of the BPH will call and request the lowest amount it retrieves from all the BPH.

Yeah, I think this is an open question. Overall, I think if we can't release unused permits from the RateLimiter that might be acceptable - some other BPH has signaled the overall solution is under load and it might be ok to wait another cycle until it has permits again.

Introducing a getAvailablePermits might be useful, but while BPHs such as Semaphore and RateLimiter would never have less permits in the acquire stage than in the getAvailablePermits, dynamic BPHs could have, so that might introduce complexity.

Overall IMO unless we see a more compelling reason to introduce this 2-stage acquiring process we might want to go with the simpler 1-stage one.

But what I struggle with is the current high/low throughput mode of the SemaphoreBPH as this one has a logic in high throughput, to acquire all available permits in case it timed out when trying to acquire the full amount requested.

Yeah, the point here is that we should always try to fetch the whole batch since it's more efficient, so if we need to wait e.g. 1 second for it that's ok.

Overall, I understand there are some behaviors in SemaphoreBPH that might not be a perfect fit as they are for this cooperative behavior. But I think that as long as we call the appropriate methods - requestBatch, releaseBatch, request, release - it should work well enough, and we can work any kinks as we go. We need to check this thoroughly though.

This could in theory allow to remove all semaphore logic from the BPH and only keep it in the composite one.

I think we should be careful here. Ideally, we should not need to change anything in the current SemaphoreBPH implementation, and the CompositeBPH should be as simple as possible, only containing the orchestration logic.

I think we have a few open questions to consider:

  1. Should we have a getAvailablePermits method to check available permits before acquiring?
  2. Should we always request a Batch from each BPH, or should we downgrade the calls if a BPH returns a smaller amount of permits?

IMO so far we should keep it simple - not introduce the getAvailablePermits method, and always request a batch of permits, but perhaps we can think about how each of these decisions would affect the implementations we have in mind.

@loicrouchon
Copy link
Author

Hi again @tomazfernandes,

Few observations I made while digging into it today (please let me know if I got one wrong):

  • BPH#request(int amount) method is never called in the whole project, only BABPH#requestBatch() is called by the AbstractPollingMessageSource#pollAndEmitMessages().
  • BABPH#releaseBatch() is only called by the if no permits were used. I assume this means the polling from SQS returned 0 messages. (called in AbstractPollingMessageSource#releaseUnusedPermits)
  • BPH#release(int amount) is only called three times:
    • Once in AbstractPollingMessageSource#releaseUnusedPermits with amount being permits - msgs.size(), so in the [0, permits[ range. This correspond to the case where some permits were used and some unused and it's releasing the unused ones. The 0 case seems weird, I don't remember seeing it in debug, but reading the code, it seems it can happen.
    • Once in AbstractPollingMessageSource#pollAndEmitMessages when the message source was stopped after permits were acquired with the full amount of permits. So kind of unused permits use case.
    • Once in AbstractPollingMessageSource#releaseBackPressure with amount 1. So what I understand this one is once a message processing is finished.

Looking at the SemaphoreBPH releaseBatch() and release(int amount) methods, it seems the logic to change the throughput mode depends on an important notion which is whether the release is due to unused permits or to a successful message processing.

  • For the releaseBatch(), we know it's unused permits.
  • For the release(int amount) with amount == 0 or amount > 1, we also know it's unused permits.
  • But for release(int amount) with amount == 1 we cannot know if it's due to an unused permit or not.

So far my attempts to make it work are failing and I'm 90% sure at this step this is coming from there.

  1. Should we have a getAvailablePermits method to check available permits before acquiring?

I'm trying to do without this at the moment, I think it can work.

  1. Should we always request a Batch from each BPH, or should we downgrade the calls if a BPH returns a smaller amount of permits?

I'm trying with the downgrade at the moment, but then I faced the issue that the SemaphoreBPH#request(int amount) method has a completely different logic than the SemaphoreBPH#requestBatch(). Given I could not find any place in the code base (except my changes) calling BPH#request(int amount), I'm replacing SemaphoreBPH#request(int amount) implementation with the batch one making the batch size a parameter.

I'm mostly experimenting at this stage and not committing to this. Yet even if we do not go for the downgrade, I think it's an important question to address. If the BPH#request(int amount) is never called, what is the purpose of that method?

At the moment, I feel, we could get away by:

  • Relying on the BPH interface only for use within the CompositeBPH.
    • The AbstractPollingMessageSource#pollAndEmitMessages() would always call the CompositeBPH#requestBatch() method.
    • But the CompositeBPH#requestBatch() would never call requestBatch(). It would call request(int amount).
  • Individual BPH could switch to a batch logic if it matters for their implemention depending on how many permits are requested.
  • BPH interface would need to introduce a releaseUnused(int amount) method that I think would introduce a better semantic for BPH implementation to decide how to behave. This method shall be used instead of releaseBatch() and release(int amount) of unused permits.

Let me know what you think. I'll resume digging into this next week.

@tomazfernandes
Copy link
Contributor

tomazfernandes commented Jan 5, 2025

Hey @loicrouchon, excellent analysis, thanks!

But for release(int amount) with amount == 1 we cannot know if it's due to an unused permit or not.

Yeah, for 1 it gets trickier. For the current logic I don't think it makes a difference - if it's not returning a whole batch, it should not switch to low throughput whether it's unused or message processed. And in case batch size is equal to 1 the TP mode shouldn't matter anyway. But for other use-cases I agree it might.

If the BPH#request(int amount) is never called, what is the purpose of that method?

This logic changed a few times as it evolved. The first version had only a request() method, then release(amount), then the batch methods.

The Batch interface was introduced to try to isolate responsibilities - the MessageSource wouldn't know details about batch size and such, and the BPH wouldn't need to know about anything other than handling permits and batches of permits.

This kind of leaked when I had to add the getBatchSize to be called in the MessageSource and determine whether we were returning a full batch or not to know which method to call, so it was hard to hold the boundary there.

I don't have any strong opinions on some of the open questions, but here are some other thoughts for us to consider.

CompositeBPH

I think CompositeBPH should be entirely optional and an opt-in component. This way we keep logic as simple as possible for users that will use only one BPH which I believe will be the most common use-case, while also making a lot easier for us to release this without risking breaking existing functionality.

Ideally, we should be able to keep SemaphoreBPH with no or minimal changes, and make sure everything works the same way as they do today.

Release Unused Method

My initial idea with the Batch interface was to isolate BPH from knowing too many specifics regarding what permits are being used for, and keeping it agnostic. But as the logic for back pressure control became more complicated and nuanced, perhaps it makes sense to let BPH have more information so it can make more complex decisions.

For instance, it could also have a releaseOnError(int amount, Throwable error) method that could allow circuit-breaker-like implementations to reduce throughput in case error rate is too high, or maybe even stopping consumption altogether depending on the error. Wdyt?

We could have releaseOnSuccess, releaseUnused, and releaseOnError methods for fine-grained semantics, while keeping them optional so other BPH could simply use the one release method from the original interface.

I'm not sure what's the best way to introduce these methods. We might want to introduce a new interface extending BPH that could have these methods as default methods delegating to the original release(int amount) method, and, as you suggested before, deprecate the batch interface if it's not necessary anymore.

OTOH, I think the batch concept also plays a role, and having a setBatchSize method could help us automate setting batch size for any relevant BPH implementations.

What I think we should keep in mind is - since this is a rather complex and sensitive part of the integration, ideally we should be able to make changes incrementally, keeping this PR as simple as possible, and following up with changes as required. Otherwise we might end up needing to make these changes as part of a Milestone version and it'd take longer to release it.

Let me know your thoughts, thanks!

@tomazfernandes
Copy link
Contributor

One more thought.

Permits Downgrade

I think it's a good idea to try to include this. One use-case I can think of is - let's say we have a rate limiter and the SemaphoreBPH.

If the rate limiter limits our permits to 5, it doesn't make sense for the SemaphoreBPH to wait to try to acquire a full batch of e.g. 10 permits only to be limited to 5, in case 5 permits were immediately available.

The only thing for us to consider is if it would be simpler to first work only with batches and introduce this feature in a separate PR as an enhancement, but no strong opinions on this.

@loicrouchon loicrouchon force-pushed the feature/backpressure-limiter branch from bb998d4 to 432d490 Compare January 7, 2025 16:57
@loicrouchon
Copy link
Author

Hey @tomazfernandes, thanks for your inputs. I was able to resume and accommodate most of your comments (I think).

I managed to implement the CompositeBackPressureHandler but in order to allow for permits downgrade, I had to make differences between releasing permits uses cases like:

  • Limited by another BackPressureHandler
  • None fetched from SQS (former releaseBatch)
  • Some fetched from SQS
  • Message processed

Without those distinctions, the throughput mode change was done in a way that was not compatible with the integration tests. All of this was fixed by remove the batch notion for releases and replacing it with a ReleaseReason enum which gives more semantics while being compatible with the permits downgrade.

Here are the changes compared to the previous state.

  • Introduced the CompositeBackPressureHandler
  • Removed BatchAwareBackPressureHandler#getBatchSize() (not used anymore)
  • Removed BatchAwareBackPressureHandler#releaseBatch() (not used anymore)
  • Introduced BackPressureHandler.ReleaseReason enum
  • Update BackPressureHandler#release(int) signature to BackPressureHandler#release(int, ReleaseReason)
  • Adapt SemaphoreBackPressureHandler to make it compatible with the new release mode (i.e. no more releaseBatch).
    • The throughput mode is only changed to high if at least one message was fetched from SQS (release with ReleaseReason#PARTIAL_FETCH)
    • The throughput mode is only changed to low if no messages were fetched from SQS (release with ReleaseReason#NONE_FETCHED)
    • The throughput mode is unchanged when permits are limited by another BackPressureHandler (release with ReleaseReason#LIMITED) or when messages are processed (release with ReleaseReason#PROCESSED)
    • Replace hasAcquiredFullPermits boolean with intlowThroughputPermitsAcquired as low throughput request can be done with a different value than batchSize (to allow pre or post permits downgrade)

It's still experimental (I did not invest much time into the configuration of it in the AbstractPipelineMessageListenerContainer#createBackPressureHandler), but I think it provides enough details for us to continue the discussion.

I'll be waiting for your feedback.

@tomazfernandes
Copy link
Contributor

tomazfernandes commented Jan 15, 2025

Hi @loicrouchon, sorry, I still haven't had time to look too deeply into the new implementation, but I have a few comments to keep things moving:

  1. We must be mindful of breaking changes - we can't remove methods or interfaces. We can deprecate them instead, and / or overload existing methods.
  2. With the CompositeBPH, do we still need the BackPressureHandlerLimiter interface? Given with the CBPH we can have limiting functionality by introducing another BPH will handle the limiting.
  3. The ReleaseReason is an interesting proposition, though I wonder what are the tradeoffs compared to having separate methods in a new interface? I'm thinking from the user side - they might be interested in only implementing one or two behaviors for a particular BPH, we might save them from having to check on the reason on their logic. The ReleasedReason approach seems more extensible though.
  4. One other thing we could throw in would be having a separate BPH exclusively for managing High Throughput Mode and Low Throughput Mode, if you think this would make the rest of the solution simpler, it might be worth considering.

Please let me know your thoughts, thanks!

@loicrouchon
Copy link
Author

Hey @tomazfernandes, thanks for the feedback

  1. We must be mindful of breaking changes - we can't remove methods or interfaces. We can deprecate them instead, and / or overload existing methods.

You're totally right, I overlooked this aspect. I'll add the methods back to the interface(s) and mark them as deprecated.

  1. With the CompositeBPH, do we still need the BackPressureHandlerLimiter interface? Given with the CBPH we can have limiting functionality by introducing another BPH will handle the limiting.

BackPressureHandlerLimiter is not an interface in the latest changes, but a BatchAwareBackPressureHandler implementation. It relies on the BackPressureLimiter interface (to be implemented by client code) to get what is the current limit. The BackPressureHandlerLimiter then uses this limit value to limit the concurrency using a reducible Semaphore. In other words, it limits the concurrency to the value obtained from the BackPressureLimiter.
I tried to implement it without a semaphore (returning the limit directly to the CompositeBPH, but it turns out it wasn’t working so well because:

  • It would cause the SemaphoreBPH to request less permits
  • As the SemaphoreBPH is limited, it still has permits available
  • Meaning the new polling cycle starting immediately might still go through without waiting (if the limit is lower than the available permits left on the SemaphoreBPH)
  • So in this case, the limit does not limit the processing speed, but increases the number of polls. It however would correctly stop all processing if the limit returned would be 0

I thought about the use cases for such a system (without a reducible semaphore in the BackPressureHandlerLimiter) and I couldn’t find a use case where it made sense. Such an implementation feels like a good one at first sight but I think it is bogus in practice. Now I might lack imagination here, so please challenge me.

For these reasons I thought the BackPressureHandlerLimiter need to implement concurrency control within itself.

Let me know if this makes sense or if you think such an implementation is too specific and should be provided by user code. If to be provided by user code, how to handle integration testing of the CompositeBPH? Should this BackPressureHandlerLimiter implementation be moved to test sources (maybe simplified by merging the BackPressureLimiter into it).

  1. The ReleaseReason is an interesting proposition, though I wonder what are the tradeoffs compared to having separate methods in a new interface? I'm thinking from the user side - they might be interested in only implementing one or two behaviors for a particular BPH, we might save them from having to check on the reason on their logic. The ReleasedReason approach seems more extensible though.

I started with the release(int amount) and releaseBatch() methods. There were 3 release patterns.

  • Nothing fetched from SQS -> releaseBatch()
  • Partial fetch from SQS -> release(int amount) with amount in the [0, batchSize[ interval
  • Processed message -> release(1)

Because of the introduction of the CompositeBPH and the limit notion, I started to replace methods (not considering backward compatibility at this stage, this would come later) by the following:

  • Nothing fetched from SQS & partial fetch -> releaseUnused(int amount)
  • Processed message -> release(1)

But then, I realized the nothing fetched is an important notion for switching the SemaphoreBPH throughput mode. So I introduced releaseNoneFetched(int amount).

Then again, I realized that releasing permits because of being limited by another BPH is different from a partial fetch and this could be a useful piece of information. At this point, I decided against introducing one more method (releaseLimited(int amount)) and went for the ReleaseReason.

Now regarding the release(int amount, ReleaseReason reason), BPH implementation can ignore the reason if they are not concerned with it. That is what the BackPressureHandlerLimiter does.

Regarding backward compatibility, we could have:

@Deprecated // will not be called
default void BPH#release(int amount) {
    release(amount, ReleaseReason.PROCESSED); // or empty if you prefer
}
@Deprecated // will not be called
default void BABPH#releaseBatch() {
    release(getBatchSize(), ReleaseReason.NONE_FETCHED); // or empty if you prefer
}
  1. One other thing we could throw in would be having a separate BPH exclusively for managing High Throughput Mode and Low Throughput Mode, if you think this would make the rest of the solution simpler, it might be worth considering.

You mean one BPH for high throughput and another for low throughput? I’m struggling to see a solution that would not break the existing behavior provided by the SemaphoreBPH.

Let me know what you think with those additional details, thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component: sqs SQS integration related issue
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants