Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Infinite reconnect until BufferChunkOverflowError is resolved #511

Open
kuckjwi0928 opened this issue Jul 4, 2024 · 4 comments
Open

Infinite reconnect until BufferChunkOverflowError is resolved #511

kuckjwi0928 opened this issue Jul 4, 2024 · 4 comments
Labels

Comments

@kuckjwi0928
Copy link

kuckjwi0928 commented Jul 4, 2024

Describe the bug

When a BufferChunkOverflowError occurs, it gets caught in an unexpected error and infinitely reconnects until BufferChunkOverflowError is resolved.

I think the BufferChunkOverflow error should be included in the BufferError and retry without reconnecting the consumer, is this a bug?

To Reproduce

Checking the fluentd-kafka-plugin log after a BufferChunkOverflowError

Reproduce the steps

  1. Set the output plugin buffer size to 1
  2. send message
  3. check the log

Expected behavior

It looks like BufferChunkOverflowError should be included in BufferError and retried without restarting the consumer.

Your Environment

- Fluentd version: 1.16.2
- TD Agent version:
- fluent-plugin-kafka version: v0.19.2
- ruby-kafka version: 1.5.0
- Operating system:
- Kernel version: 

Your Configuration

<source>
  @type kafka_group
  consumer_group fluentd-consumer
  brokers kafka-controller-0.kafka-controller-headless.kafka.svc.cluster.local:9092,kafka-controller-1.kafka-controller-headless.kafka.svc.cluster.local:9092,kafka-controller-2.kafka-controller-headless.kafka.svc.cluster.local:9092
  topics test-topic
  format json
  offset_commit_interval 5
  offset_commit_threshold 100
</source>

<match test-topic>
  @type stdout
  <buffer>
     @type memory
     retry_max_times 3
     flush_mode interval
     flush_interval 1s
     flush_thread_interval 0.1
     flush_thread_burst_interval 0.01
     flush_thread_count 5 
     chunk_full_threshold 0.1
     chunk_limit_size 1 # This is just a setup to intentionally throw a BufferChunkOverFlow error.
  </buffer>
</match>

Your Error Log

2024-07-04 00:52:37 +0000 [info]: #1 Subscribe to topics matching the regex test-topic
2024-07-04 00:52:37 +0000 [warn]: #1 Re-starting consumer 2024-07-04 00:52:37 +0000
2024-07-04 00:52:44 +0000 [warn]: #1 emit transaction failed: error_class=Fluent::Plugin::Buffer::BufferChunkOverflowError error="a 3268 bytes record (nth: 0) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 1) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 2) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 3) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 4) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 5) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 6) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 7) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 8) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 9) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 10) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 11) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 12) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 13) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 14) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 15) is larger than buffer chunk limit size (1)" location="/usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/plugin/buffer.rb:457:in `write'" tag="telemetry.ab"
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/event_router.rb:198:in `rescue in emit_events'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/event_router.rb:195:in `emit_events'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/event_router.rb:115:in `emit_stream'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-rewrite-tag-filter-2.4.0/lib/fluent/plugin/out_rewrite_tag_filter.rb:111:in `block (2 levels) in process'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-rewrite-tag-filter-2.4.0/lib/fluent/plugin/out_rewrite_tag_filter.rb:110:in `each'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-rewrite-tag-filter-2.4.0/lib/fluent/plugin/out_rewrite_tag_filter.rb:110:in `block in process'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-rewrite-tag-filter-2.4.0/lib/fluent/plugin/out_rewrite_tag_filter.rb:108:in `each'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-rewrite-tag-filter-2.4.0/lib/fluent/plugin/out_rewrite_tag_filter.rb:108:in `process'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/plugin/output.rb:885:in `emit_sync'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/event_router.rb:196:in `emit_events'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/event_router.rb:115:in `emit_stream'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/in_kafka_group.rb:376:in `emit_events'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/in_kafka_group.rb:347:in `process_batch'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/in_kafka_group.rb:358:in `block in run'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:336:in `block (3 levels) in each_batch'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:23:in `instrument'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:35:in `instrument'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:334:in `block (2 levels) in each_batch'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:314:in `each'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:314:in `block in each_batch'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:419:in `block in consumer_loop'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:23:in `instrument'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:35:in `instrument'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:417:in `consumer_loop'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:311:in `each_batch'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/in_kafka_group.rb:354:in `run'
2024-07-04 00:52:44 +0000 [warn]: #1 emit transaction failed: error_class=Fluent::Plugin::Buffer::BufferChunkOverflowError error="a 3268 bytes record (nth: 0) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 1) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 2) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 3) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 4) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 5) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 6) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 7) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 8) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 9) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 10) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 11) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 12) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 13) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 14) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 15) is larger than buffer chunk limit size (1)" location="/usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/plugin/buffer.rb:457:in `write'" tag="test-topic"
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/event_router.rb:198:in `rescue in emit_events'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/event_router.rb:195:in `emit_events'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/event_router.rb:115:in `emit_stream'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/in_kafka_group.rb:376:in `emit_events'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/in_kafka_group.rb:347:in `process_batch'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/in_kafka_group.rb:358:in `block in run'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:336:in `block (3 levels) in each_batch'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:23:in `instrument'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:35:in `instrument'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:334:in `block (2 levels) in each_batch'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:314:in `each'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:314:in `block in each_batch'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:419:in `block in consumer_loop'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:23:in `instrument'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:35:in `instrument'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:417:in `consumer_loop'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:311:in `each_batch'
  2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/in_kafka_group.rb:354:in `run'
2024-07-04 00:52:45 +0000 [error]: #1 unexpected error during consuming events from kafka. Re-fetch events. error="Kafka::ProcessingError"
  2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:345:in `rescue in block (3 levels) in each_batch'
  2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:338:in `block (3 levels) in each_batch'
  2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:23:in `instrument'
  2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:35:in `instrument'
  2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:334:in `block (2 levels) in each_batch'
  2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:314:in `each'
  2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:314:in `block in each_batch'
  2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:419:in `block in consumer_loop'
  2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:23:in `instrument'
  2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:35:in `instrument'
  2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:417:in `consumer_loop'
  2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:311:in `each_batch'
  2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/in_kafka_group.rb:354:in `run'
2024-07-04 00:52:45 +0000 [warn]: #1 Stopping Consumer
2024-07-04 00:52:45 +0000 [warn]: #1 Could not connect to broker. retry_time:0. Next retry will be in 30 seconds

Additional context

No response

@kuckjwi0928 kuckjwi0928 changed the title Infinite reconnect on BufferChunkOverflowError Infinite reconnect until BufferChunkOverflowError is resolved on BufferChunkOverflowError Jul 4, 2024
@kuckjwi0928 kuckjwi0928 changed the title Infinite reconnect until BufferChunkOverflowError is resolved on BufferChunkOverflowError Infinite reconnect until BufferChunkOverflowError is resolved Jul 4, 2024
@daipom daipom moved this to Triage in Fluentd Kanban Nov 25, 2024
@Watson1978
Copy link

Seems that we might care the exception...

@Watson1978 Watson1978 moved this from Triage to To-Do in Fluentd Kanban Jan 22, 2025
@daipom
Copy link
Contributor

daipom commented Jan 22, 2025

Thanks for your report. Sorry for late response.

There may be some bug.

BufferChunkOverflowError would be handled correctly here.

def emit_events(tag, es)
retries = 0
begin
router.emit_stream(tag, es)
rescue BufferError
raise ForShutdown if @consumer.nil?
if @retry_emit_limit.nil?
sleep 1
retry
end
if retries < @retry_emit_limit
retries += 1
sleep 1
retry
else
raise RuntimeError, "Exceeds retry_emit_limit"
end
end
end

However, somehow, Kafka::ProcessingError happens here.

begin
@consumer.each_batch(**@fetch_opts) { |batch|
if @tag_source == :record
process_batch_with_record_tag(batch)
else
process_batch(batch)
end
}
rescue ForShutdown
rescue => e
log.error "unexpected error during consuming events from kafka. Re-fetch events.", :error => e.to_s
log.error_backtrace
reconnect_consumer
end

At first glance, this appears to have nothing to do with BufferChunkOverflowError...

We need to investigate this phenomenon.

@kuckjwi0928
Copy link
Author

BufferError = if defined?(Fluent::Plugin::Buffer::BufferOverflowError)

Shouldn’t this code be designed to handle both BufferOverflowError and BufferChunkOverflowError?

In the Fluentd code, it is defined as follows:
https://github.com/fluent/fluentd/blob/d35f5a11da0701f080b4af56a91fbe64dba74c29/lib/fluent/plugin/buffer.rb#L35

@daipom
Copy link
Contributor

daipom commented Feb 6, 2025

Oh! Thanks!
It is a very old code.
It would be the cause of this bug.
We should fix it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Status: To-Do
Development

No branches or pull requests

3 participants