Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CAMEL-20227: camel-kafka - Kafka offset advances when using Pausable … #14745

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

davsclaus
Copy link
Contributor

…EIP and messages are lost when resumed

Description

Target

  • I checked that the commit is targeting the correct branch (note that Camel 3 uses camel-3.x, whereas Camel 4 uses the main branch)

Tracking

  • If this is a large change, bug fix, or code improvement, I checked there is a JIRA issue filed for the change (usually before you start working on it).

Apache Camel coding standards and style

  • I checked that each commit in the pull request has a meaningful subject line and body.
  • I have run mvn clean install -DskipTests locally and I have committed all auto-generated changes

@davsclaus davsclaus requested a review from orpiske July 5, 2024 10:13
Copy link
Contributor

github-actions bot commented Jul 5, 2024

🌟 Thank you for your contribution to the Apache Camel project! 🌟

🤖 CI automation will test this PR automatically.

🐫 Apache Camel Committers, please review the following items:

  • First-time contributors require MANUAL approval for the GitHub Actions to run

  • You can use the command /component-test (camel-)component-name1 (camel-)component-name2.. to request a test from the test bot.

  • You can label PRs using build-all, build-dependents, skip-tests and test-dependents to fine-tune the checks executed by this PR.

  • Build and test logs are available in the Summary page. Only Apache Camel committers have access to the summary.

  • ⚠️ Be careful when sharing logs. Review their contents before sharing them publicly.

@davsclaus
Copy link
Contributor Author

Argh yeah this test is a bit wrong as you want to be able to consume all 15 messages, but in the middle the consumer is paused

@davsclaus davsclaus marked this pull request as draft July 5, 2024 10:16
@davsclaus davsclaus marked this pull request as ready for review July 5, 2024 10:40
@davsclaus
Copy link
Contributor Author

davsclaus commented Jul 5, 2024

okay I improved this so the pause and resume will not lose messages and that the pause is waiting a poll cycle, otherwise it keeps looping asap which is not desireable (eat cpu and goes too fast)

@orpiske
Copy link
Contributor

orpiske commented Jul 5, 2024

Cool. It's a holiday here, but I will try to take a look later today

@orpiske
Copy link
Contributor

orpiske commented Jul 5, 2024

At first it wasn't very clear to me how the user would be losing messages after the pause.

The KafkaConsumer documentation says that pause "Suspend fetching from the requested partitions. Future calls to poll(Duration) will not return any records from these partitions until they have been resumed using resume(Collection)".

But looking more closely to the reproducer, I think I kinda understand the problem ... It's in part because of a misuse of the API (and, poor documentation for this particular feature).

In particular, our documentation states:

"The pausable EIP is meant to be used as a support mechanism when there is an exception somewhere in the route that prevents the exchange from being processed. More specifically, the check called by the pausable EIP should be used to test for transient conditions preventing the exchange from being processed."

Specifically, the trigger for those transient problems should be pulled by an exception, so that we have a failure in the processing result. Otherwise, we never cause the pause, thus leading to them losing the messages.

So, to make it short: the reproducer from the user is wrong.

@orpiske
Copy link
Contributor

orpiske commented Jul 5, 2024

That said, I think we do have a room for improvement on the API. I am not sure if we should be allowing the code to resume the assignment if pause hasn't been called.

Maybe we could throw an exception explaining that this is a misuse of the API, and trying to force a resume without an underlying transient problem is not supported.

@davsclaus davsclaus marked this pull request as draft July 5, 2024 15:08
@davsclaus
Copy link
Contributor Author

Yeah okay that can be better explain in the docs / javadoc / and the API is using a predicate so an exception is not thrown.

If the intend is to pause/resume kafka via some logic then use RoutePolicy that is intended for that. There is a throttling policy out of the box, but you can make your own as well.

@orpiske
Copy link
Contributor

orpiske commented Jul 7, 2024

@davsclaus actually, this is different from pausing the route per se. The idea for the pausable API is to pause the Kafka consumer (so as not to cause a rebalance when there is a transient issue).

Nonetheless, let's see if we can brainstorm some improvements to this once you are back. There are some specific requirements for pausing the KafkaConsumer (such as: it must be done from the same thread as the one calling 'poll'), but if we could do this using the RoutePolicy, we could retire this API and make it simpler for the users.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants