Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka(consumer): Document/clarify the consumer behavior, or implement re-processing if processor fails #118

Open
marclop opened this issue May 2, 2023 · 6 comments
Labels
enhancement New feature or request

Comments

@marclop
Copy link
Contributor

marclop commented May 2, 2023

Description

Based on the guidance given in twmb/franz-go#440 (comment). We should decide what our AtLeastOnceDelivery guarantees are. Currently, it may be expected that a consumer tries to run the passed Processor more than once if processing fails, but that is not the case.

AtLeastOnceDelivery may only occur if a consumer has fetched some records from Kafka, but the process is terminated, crashes or loses network connectivity. After the consumer misses enough check-ins, the Kafka broker will consider that consumer instance in the consumer group as dead, and will resign its assigned partitions to other active consumers.

However, if the configured Processor fails to process a single record, the commit is skipped and the next Fetch may pull new records, commit the new offset and will lead to up to MaxPollRecords being lost for a partition.

// Processor that will be used to process each event individually.
// It is recommended to keep the synchronous processing fast and below the
// rebalance.timeout.ms setting in Kafka.
//
// The processing time of each processing cycle can be calculated as:
// record.process.time * MaxPollRecords.
Processor model.BatchProcessor

// MaxPollRecords defines an upper bound to the number of records that can
// be polled on a single fetch. If MaxPollRecords <= 0, defaults to 100.
//
// It is best to keep the number of polled records small or the consumer
// risks being forced out of the group if it exceeds rebalance.timeout.ms.
MaxPollRecords int

Proposed solution(s)

1. Document the current behavior, add logging

Implemented in #125

The simplest path forward would be to continue processing the records that can be processed instead of stopping the processing of partitions record at that offset and potentially losing the rest of that fetch and log those that cannot be. This will lead to less data loss for AtMostOnceDelivery. Update the documentation accordingly.

2. Implement some sort of per-partition processing retries

Instead of dropping the events that can't be processed, a per-partition sized buffer could be kept which could be retried on a timer or when there aren't any active records fetched for the partition.

This option increases the memory footprint of the consumer and is slightly more complicated to implement well.

@marclop marclop added the bug Something isn't working label May 2, 2023
@marclop marclop added this to the 8.9 milestone May 2, 2023
@axw
Copy link
Member

axw commented May 3, 2023

Since we're not planning to use AtLeastOnceDelivery in the near future, I think (1) is preferable for now. Longer term I think we should restart the consumer on error, rewind to the last committed offset + 1, and start processing again; this might mean some events are reprocessed.

marclop added a commit to marclop/apm-queue that referenced this issue May 3, 2023
Improves how `AtLeastOnceDelivery` is handled by the kafka consumer.
Previously, the consumer could discard up to `cfg.MaxPollRecords`, since
it incorrectly assumed that unacknowledged records would be re-delivered
by Kafka if not committed within a certain amount of time.

Now, it discards any records which can't be processed, but continues
processing the rest of the records in the fetch. It can still result in
some data loss, and is at the mercy of the failures returned by the
processor, but it's much better than previously.

Additionally, adds a few tests which cover both `AtLeastOnceDelivery`
and `AtMostOnceDelivery`, and a `TestGracefulShutdown` test.

Partially addresses elastic#118
Part of elastic#123

Signed-off-by: Marc Lopez Rubio <[email protected]>
marclop added a commit to marclop/apm-queue that referenced this issue May 3, 2023
Improves how `AtLeastOnceDelivery` is handled by the kafka consumer.
Previously, the consumer could discard up to `cfg.MaxPollRecords`, since
it incorrectly assumed that unacknowledged records would be re-delivered
by Kafka if not committed within a certain amount of time.

Now, it discards any records which can't be processed, but continues
processing the rest of the records in the fetch. It can still result in
some data loss, and is at the mercy of the failures returned by the
processor, but it's much better than previously.

Additionally, adds a few tests which cover both `AtLeastOnceDelivery`
and `AtMostOnceDelivery`, and a `TestGracefulShutdown` test.

Partially addresses elastic#118
Part of elastic#123

Signed-off-by: Marc Lopez Rubio <[email protected]>
@lahsivjar
Copy link
Contributor

How about using a DLQ for this? We can have a single DLQ for all messages, a DLQ consumer will consume from the DLQ with a delay and put it back to the original topic for another retry.

marclop added a commit that referenced this issue May 5, 2023
Improves how `AtLeastOnceDelivery` is handled by the kafka consumer.
Previously, the consumer could discard up to `cfg.MaxPollRecords`, since
it incorrectly assumed that unacknowledged records would be re-delivered
by Kafka if not committed within a certain amount of time.

Now, it discards any records which can't be processed, but continues
processing the rest of the records in the fetch. It can still result in
some data loss, and is at the mercy of the failures returned by the
processor, but it's much better than previously.

Additionally, adds a few tests which cover both `AtLeastOnceDelivery`
and `AtMostOnceDelivery`, and a `TestGracefulShutdown` test.

Partially addresses #118
Part of #123

---------

Signed-off-by: Marc Lopez Rubio <[email protected]>
@marclop
Copy link
Contributor Author

marclop commented May 5, 2023

Solution 1 has been implemented as part of #125.

@axw that sounds feasible. No action would be required from our side in term of managing offsets, since we wouldn't have committed them. One drawback of that approach is that it will cause a rebalance (the cooperative sticky balancer makes it less impactful) and partitions to be shuffled. Something else to consider is how we'd manage cases where a record can't ever be processed, with the proposed approach. It would "poison" the consumers and cause quite a lot of load in Kafka, and disrupt live systems.

@lahsivjar I think that approach sounds easier to manage and handle. It does mean that we're writing and reading another time to / from a topic, but it provides clean separation. It may also allow us to use record headers to handle current / max retries.

@axw
Copy link
Member

axw commented May 8, 2023

Something else to consider is how we'd manage cases where a record can't ever be processed, with the proposed approach. It would "poison" the consumers and cause quite a lot of load in Kafka, and disrupt live systems.

Good point. The DLQ/re-enqueuing approach with an explicit retry count should work well for that. I think this is complementary, though? We could fail to re-enqueue, and if we want "at least once" then in this case we'll still need to reprocess the original message.

@lahsivjar
Copy link
Contributor

It may also allow us to use record headers to handle current / max retries.

Good idea; we can probably call these queues retry queues and after a certain number of retries push the messages to DLQ. DLQ messages will not be processed and will need manual intervention.

I think this is complementary, though? We could fail to re-enqueue, and if we want "at least once" then in this case we'll still need to reprocess the original message.

Sounds correct to me. I was thinking that if we failed to re-enqueue a message then we would commit the offset till the previous message and just keep reprocessing the failed one. This is under the assumption that a failure to re-enqueue messages would either be a bug in the application or Kafka/infrastructure-level failure and not dependent on the input/data. Such a case would require manual intervention.

@simitt simitt added enhancement New feature or request and removed bug Something isn't working labels Jun 8, 2023
@simitt
Copy link
Contributor

simitt commented Jun 8, 2023

relabeled to enhancement since solution 1 is implemented and we are not using AtLeastOnceDelivery for now.

@simitt simitt removed this from the 8.9 milestone Jun 9, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants