Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_kafka: Add dynamic/static headers support #8583

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

DIFRIN
Copy link

@DIFRIN DIFRIN commented Mar 15, 2024

Currently,the output kafka plugin not support headers. The goal of this pull-request is to add support for dynamic/static headers to the output kafka plugin

No issue

Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • docker run --rm --network kafka_default -e value_test='${text}' -it fluent/fluent-bit-local:latest -i dummy -p dummy='{"text": "hello world"}' -p samples=1 -o kafka -p brokers=kafka:9092 -p topics=test_fluentbit -p header="key2 <text>" -vvv
    -###################################################
    Fluent Bit v3.0.0
  • Copyright (C) 2015-2024 The Fluent Bit Authors
  • Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
  • https://fluentbit.io

. __ . __ ________
_ ____/| | __ __ ____ / | _ _
|/ |
___ _

| __) | | | | _/ __ \ / \ __\ | | / \ _\ \ / / ( <
| \ | |
| | /\ /| | \ | | | \ || | \ / /
_
/ |
/____/ _ >
| /| |
____ /||| _/ /______ /
/ / / / /

[2024/03/15 12:25:59] [ info] Configuration:
[2024/03/15 12:25:59] [ info] flush time | 1.000000 seconds
[2024/03/15 12:25:59] [ info] grace | 5 seconds
[2024/03/15 12:25:59] [ info] daemon | 0
[2024/03/15 12:25:59] [ info] ___________
[2024/03/15 12:25:59] [ info] inputs:
[2024/03/15 12:25:59] [ info] dummy
[2024/03/15 12:25:59] [ info] ___________
[2024/03/15 12:25:59] [ info] filters:
[2024/03/15 12:25:59] [ info] ___________
[2024/03/15 12:25:59] [ info] outputs:
[2024/03/15 12:25:59] [ info] kafka.0
[2024/03/15 12:25:59] [ info] ___________
[2024/03/15 12:25:59] [ info] collectors:
[2024/03/15 12:25:59] [ info] [fluent bit] version=3.0.0, commit=9d9ac68a2b, pid=1
[2024/03/15 12:25:59] [debug] [engine] coroutine stack size: 24576 bytes (24.0K)
[2024/03/15 12:25:59] [ info] [storage] ver=1.5.1, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2024/03/15 12:25:59] [ info] [cmetrics] version=0.7.0
[2024/03/15 12:25:59] [ info] [ctraces ] version=0.4.0
[2024/03/15 12:25:59] [ info] [input:dummy:dummy.0] initializing
[2024/03/15 12:25:59] [ info] [input:dummy:dummy.0] storage_strategy='memory' (memory only)
[2024/03/15 12:25:59] [debug] [dummy:dummy.0] created event channels: read=21 write=22
[2024/03/15 12:25:59] [debug] [kafka:kafka.0] created event channels: read=23 write=24
[2024/03/15 12:25:59] [ info] [output:kafka:kafka.0] Starting kafka output init
[2024/03/15 12:25:59] [ info] [output:kafka:kafka.0] brokers='kafka:9092' topics='test_fluentbit'
[2024/03/15 12:25:59] [ info] [sp] stream processor started
[2024/03/15 12:26:00] [debug] [input chunk] update output instances with new chunk size diff=39, records=1, input=dummy.0
{"text"=>"hello world"}[2024/03/15 12:26:01] [debug] [task] created task=0x7f423f8366e0 id=0 OK
[2024/03/15 12:26:01] [debug] in produce_message

[2024/03/15 12:26:01] [debug] setting message headers
[2024/03/15 12:26:01] [debug] found header key2 with value
[2024/03/15 12:26:01] [debug] header key2 is part of the msg, field name :
[2024/03/15 12:26:01] [debug] key matches a field in the message
[2024/03/15 12:26:01] [ info] [output:kafka:kafka.0] Sending message completed
[2024/03/15 12:26:01] [debug] [output:kafka:kafka.0] enqueued message (53 bytes) for topic 'test_fluentbit'
[2024/03/15 12:26:01] [debug] [out flush] cb_destroy coro_id=0
[2024/03/15 12:26:01] [debug] [task] destroy task=0x7f423f8366e0 (task_id=0)

############################################################

  • Resulting records in kafka:
    kafka-console-consumer --topic test_fluentbit --bootstrap-server kafka:9092 --property print.headers=true
    key2:hello world {"@timestamp":1710505560.027627,"text":"hello world"}

  • [N/A ] Attached Valgrind output that shows no leaks or memory corruption was found

Documentation
[https://github.com/fluent/fluent-bit-docs/pull/1341]

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

@DIFRIN DIFRIN force-pushed the output-kafka-add_headers branch 2 times, most recently from 627834f to 3d7ad58 Compare March 15, 2024 20:00
@DIFRIN DIFRIN closed this Mar 19, 2024
@DIFRIN DIFRIN reopened this Mar 19, 2024
@patrick-stephens
Copy link
Contributor

Can you sort the DCO failure? This cannot be merged without it

@DIFRIN
Copy link
Author

DIFRIN commented Mar 28, 2024

@patrick-stephens DCO added to the commits

@DIFRIN
Copy link
Author

DIFRIN commented Apr 2, 2024

@patrick-stephens @edsiper this is the message of the failed check

The following tests FAILED: 46 - flb-rt-out_http (Failed) Error: Process completed with exit code 8.

it's not linked to the PR content

@patrick-stephens
Copy link
Contributor

Possibly the test failure is related to #9023

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants