Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_influxdb: Handle signed/unsigned integer as influx's integer of that representation #9301

Conversation

cosmo0920
Copy link
Contributor

@cosmo0920 cosmo0920 commented Aug 29, 2024

This is because in their line protocol, it needs to add "i" suffix for handling a value as an integer:

Datatype Element(s) Description
Integer Field values Signed 64-bit integers (-9223372036854775808 to 9223372036854775807). Specify an integer with a trailing i on the number. Example: 1i

So, we need to add "i" suffix for signed/unsigned integer values if needed.

ref: https://docs.influxdata.com/influxdb/v1/write_protocols/line_protocol_reference/

Closes #9120.


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
service:
    flush: 1
    log_level: debug

pipeline:
    inputs:
      - name: dummy
        dummy: "{\"key\":\"message\", \"number\":2}"

    outputs:
        - name: influxdb
          match: "*"
          host: localhost
          # add_integer_suffix: On
          Org: fluent
          database: fluent-bit
          auto_tags: on # Automatically tag keys where value is string. This option takes a boolean value: True/False, On/Off.
          # HTTP_Token: <awesome_token>
  • Debug log output from testing the change
Fluent Bit v3.1.7
* Copyright (C) 2015-2024 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

______ _                  _    ______ _ _           _____  __  
|  ___| |                | |   | ___ (_) |         |____ |/  | 
| |_  | |_   _  ___ _ __ | |_  | |_/ /_| |_  __   __   / /`| | 
|  _| | | | | |/ _ \ '_ \| __| | ___ \ | __| \ \ / /   \ \ | | 
| |   | | |_| |  __/ | | | |_  | |_/ / | |_   \ V /.___/ /_| |_
\_|   |_|\__,_|\___|_| |_|\__| \____/|_|\__|   \_/ \____(_)___/

[2024/08/30 17:42:17] [ info] Configuration:
[2024/08/30 17:42:17] [ info]  flush time     | 1.000000 seconds
[2024/08/30 17:42:17] [ info]  grace          | 5 seconds
[2024/08/30 17:42:17] [ info]  daemon         | 0
[2024/08/30 17:42:17] [ info] ___________
[2024/08/30 17:42:17] [ info]  inputs:
[2024/08/30 17:42:17] [ info]      dummy
[2024/08/30 17:42:17] [ info] ___________
[2024/08/30 17:42:17] [ info]  filters:
[2024/08/30 17:42:17] [ info] ___________
[2024/08/30 17:42:17] [ info]  outputs:
[2024/08/30 17:42:17] [ info]      influxdb.0
[2024/08/30 17:42:17] [ info] ___________
[2024/08/30 17:42:17] [ info]  collectors:
[2024/08/30 17:42:17] [ info] [fluent bit] version=3.1.7, commit=0d4d6bda2e, pid=21448
[2024/08/30 17:42:17] [debug] [engine] coroutine stack size: 36864 bytes (36.0K)
[2024/08/30 17:42:17] [ info] [storage] ver=1.5.2, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2024/08/30 17:42:17] [ info] [cmetrics] version=0.9.4
[2024/08/30 17:42:17] [ info] [ctraces ] version=0.5.5
[2024/08/30 17:42:17] [ info] [input:dummy:dummy.0] initializing
[2024/08/30 17:42:17] [ info] [input:dummy:dummy.0] storage_strategy='memory' (memory only)
[2024/08/30 17:42:17] [debug] [dummy:dummy.0] created event channels: read=21 write=22
[2024/08/30 17:42:17] [debug] [influxdb:influxdb.0] created event channels: read=23 write=24
[2024/08/30 17:42:17] [debug] [output:influxdb:influxdb.0] host=localhost port=8086
[2024/08/30 17:42:17] [ info] [sp] stream processor started
[2024/08/30 17:42:19] [debug] [task] created task=0x60000131c000 id=0 OK
[2024/08/30 17:42:19] [debug] [upstream] KA connection #31 to localhost:8086 is connected
[2024/08/30 17:42:19] [debug] [http_client] not using http_proxy for header
[2024/08/30 17:42:19] [debug] [output:influxdb:influxdb.0] http_do=0 OK
[2024/08/30 17:42:19] [debug] [upstream] KA connection #31 to localhost:8086 is now available
[2024/08/30 17:42:19] [debug] [out flush] cb_destroy coro_id=0
[2024/08/30 17:42:19] [debug] [task] destroy task=0x60000131c000 (task_id=0)
[2024/08/30 17:42:20] [debug] [task] created task=0x600001310000 id=0 OK
[2024/08/30 17:42:20] [debug] [upstream] KA connection #31 to localhost:8086 has been assigned (recycled)
[2024/08/30 17:42:20] [debug] [http_client] not using http_proxy for header
[2024/08/30 17:42:20] [debug] [output:influxdb:influxdb.0] http_do=0 OK
[2024/08/30 17:42:20] [debug] [upstream] KA connection #31 to localhost:8086 is now available
[2024/08/30 17:42:20] [debug] [out flush] cb_destroy coro_id=1
[2024/08/30 17:42:20] [debug] [task] destroy task=0x600001310000 (task_id=0)
[2024/08/30 17:42:21] [debug] [task] created task=0x60000131c000 id=0 OK
[2024/08/30 17:42:21] [debug] [upstream] KA connection #31 to localhost:8086 has been assigned (recycled)
[2024/08/30 17:42:21] [debug] [http_client] not using http_proxy for header
[2024/08/30 17:42:21] [debug] [output:influxdb:influxdb.0] http_do=0 OK
[2024/08/30 17:42:21] [debug] [upstream] KA connection #31 to localhost:8086 is now available
[2024/08/30 17:42:21] [debug] [out flush] cb_destroy coro_id=2
[2024/08/30 17:42:21] [debug] [task] destroy task=0x60000131c000 (task_id=0)
^C[2024/08/30 17:42:21] [engine] caught signal (SIGINT)
[2024/08/30 17:42:21] [ info] [input] pausing dummy.0
% influx query 'from(bucket: "fluent-bit") |> range(start: 2024-08-29T00:00:00Z, stop: now())'
Result: _result
Table: keys: [_start, _stop, _field, _measurement, _seq, key]
                   _start:time                      _stop:time           _field:string     _measurement:string             _seq:string              key:string                      _time:time                  _value:float
------------------------------  ------------------------------  ----------------------  ----------------------  ----------------------  ----------------------  ------------------------------  ----------------------------
2024-08-29T00:00:00.000000000Z  2024-08-30T08:43:37.233904000Z                  number                 dummy.0                       0                 message  2024-08-30T08:10:35.384719000Z                             2
2024-08-29T00:00:00.000000000Z  2024-08-30T08:43:37.233904000Z                  number                 dummy.0                       0                 message  2024-08-30T08:11:13.697479000Z                             2
2024-08-29T00:00:00.000000000Z  2024-08-30T08:43:37.233904000Z                  number                 dummy.0                       0                 message  2024-08-30T08:42:13.262590000Z                             2
2024-08-29T00:00:00.000000000Z  2024-08-30T08:43:37.233904000Z                  number                 dummy.0                       0                 message  2024-08-30T08:42:18.498846000Z                             2
Table: keys: [_start, _stop, _field, _measurement, _seq, key]
                   _start:time                      _stop:time           _field:string     _measurement:string             _seq:string              key:string                      _time:time                  _value:float
------------------------------  ------------------------------  ----------------------  ----------------------  ----------------------  ----------------------  ------------------------------  ----------------------------
2024-08-29T00:00:00.000000000Z  2024-08-30T08:43:37.233904000Z                  number                 dummy.0                       1                 message  2024-08-30T08:10:36.383642000Z                             2
2024-08-29T00:00:00.000000000Z  2024-08-30T08:43:37.233904000Z                  number                 dummy.0                       1                 message  2024-08-30T08:42:19.498964000Z                             2
Table: keys: [_start, _stop, _field, _measurement, _seq, key]
                   _start:time                      _stop:time           _field:string     _measurement:string             _seq:string              key:string                      _time:time                  _value:float
------------------------------  ------------------------------  ----------------------  ----------------------  ----------------------  ----------------------  ------------------------------  ----------------------------
2024-08-29T00:00:00.000000000Z  2024-08-30T08:43:37.233904000Z                  number                 dummy.0                       2                 message  2024-08-30T08:42:20.498893000Z                             2

To ingest already mapped as float field(s) with integer type, then got:

Fluent Bit v3.1.7
* Copyright (C) 2015-2024 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

______ _                  _    ______ _ _           _____  __  
|  ___| |                | |   | ___ (_) |         |____ |/  | 
| |_  | |_   _  ___ _ __ | |_  | |_/ /_| |_  __   __   / /`| | 
|  _| | | | | |/ _ \ '_ \| __| | ___ \ | __| \ \ / /   \ \ | | 
| |   | | |_| |  __/ | | | |_  | |_/ / | |_   \ V /.___/ /_| |_
\_|   |_|\__,_|\___|_| |_|\__| \____/|_|\__|   \_/ \____(_)___/

[2024/08/30 17:43:18] [ info] Configuration:
[2024/08/30 17:43:18] [ info]  flush time     | 1.000000 seconds
[2024/08/30 17:43:18] [ info]  grace          | 5 seconds
[2024/08/30 17:43:18] [ info]  daemon         | 0
[2024/08/30 17:43:18] [ info] ___________
[2024/08/30 17:43:18] [ info]  inputs:
[2024/08/30 17:43:18] [ info]      dummy
[2024/08/30 17:43:18] [ info] ___________
[2024/08/30 17:43:18] [ info]  filters:
[2024/08/30 17:43:18] [ info] ___________
[2024/08/30 17:43:18] [ info]  outputs:
[2024/08/30 17:43:18] [ info]      influxdb.0
[2024/08/30 17:43:18] [ info] ___________
[2024/08/30 17:43:18] [ info]  collectors:
[2024/08/30 17:43:18] [ info] [fluent bit] version=3.1.7, commit=0d4d6bda2e, pid=21954
[2024/08/30 17:43:18] [debug] [engine] coroutine stack size: 36864 bytes (36.0K)
[2024/08/30 17:43:18] [ info] [storage] ver=1.5.2, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2024/08/30 17:43:18] [ info] [cmetrics] version=0.9.4
[2024/08/30 17:43:18] [ info] [ctraces ] version=0.5.5
[2024/08/30 17:43:18] [ info] [input:dummy:dummy.0] initializing
[2024/08/30 17:43:18] [ info] [input:dummy:dummy.0] storage_strategy='memory' (memory only)
[2024/08/30 17:43:18] [debug] [dummy:dummy.0] created event channels: read=21 write=22
[2024/08/30 17:43:18] [debug] [influxdb:influxdb.0] created event channels: read=23 write=24
[2024/08/30 17:43:18] [debug] [output:influxdb:influxdb.0] host=localhost port=8086
[2024/08/30 17:43:18] [ info] [sp] stream processor started
[2024/08/30 17:43:20] [debug] [task] created task=0x6000037802c0 id=0 OK
[2024/08/30 17:43:20] [debug] [upstream] KA connection #31 to localhost:8086 is connected
[2024/08/30 17:43:20] [debug] [http_client] not using http_proxy for header
[2024/08/30 17:43:20] [error] [output:influxdb:influxdb.0] http_status=422
{"code":"unprocessable entity","message":"failure writing points to database: partial write: field type conflict: input field \"number\" on measurement \"dummy.0\" is type integer, already exists as type float dropped=1"}
[2024/08/30 17:43:20] [debug] [output:influxdb:influxdb.0] http_do=0 OK
[2024/08/30 17:43:20] [debug] [upstream] KA connection #31 to localhost:8086 is now available
[2024/08/30 17:43:20] [debug] [out flush] cb_destroy coro_id=0
[2024/08/30 17:43:20] [debug] [task] destroy task=0x6000037802c0 (task_id=0)
^C[2024/08/30 17:43:21] [engine] caught signal (SIGINT)
[2024/08/30 17:43:21] [ info] [input] pausing dummy.0

This is expected behavior because this indicates that the integer type of influxdb line protocol was used correctly.

  • Attached Valgrind output that shows no leaks or memory corruption was found

Without use_integer: Off (default value):

==1986869== 
==1986869== HEAP SUMMARY:
==1986869==     in use at exit: 0 bytes in 0 blocks
==1986869==   total heap usage: 3,543 allocs, 3,543 frees, 1,633,498 bytes allocated
==1986869== 
==1986869== All heap blocks were freed -- no leaks are possible
==1986869== 
==1986869== For lists of detected and suppressed errors, rerun with: -s
==1986869== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

With use_integer: On:

==1986130== 
==1986130== HEAP SUMMARY:
==1986130==     in use at exit: 0 bytes in 0 blocks
==1986130==   total heap usage: 3,566 allocs, 3,566 frees, 1,634,106 bytes allocated
==1986130== 
==1986130== All heap blocks were freed -- no leaks are possible
==1986130== 
==1986130== For lists of detected and suppressed errors, rerun with: -s
==1986130== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • Run local packaging test showing all targets (including any new ones) build.
  • Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • Documentation required for this feature

fluent/fluent-bit-docs#1451

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

@cosmo0920 cosmo0920 changed the title out_influxdb: Handle signed/unsigned integer as influx's integer of t… out_influxdb: Handle signed/unsigned integer as influx's integer of that representation Aug 29, 2024
…hat representation

This is because in their line protocol, it needs to add "i" suffix for
handling a value as an integer:

| Datatype | Element(s)   | Description |
|:--------:|:------------:|:------------|
| Integer | Field values | Signed 64-bit integers (-9223372036854775808 to 9223372036854775807). Specify an integer with a trailing i on the number. Example: 1i |

So, we need to add "i" suffix for signed/unsigned integer values.

ref: https://docs.influxdata.com/influxdb/v1/write_protocols/line_protocol_reference/

Signed-off-by: Hiroshi Hatake <[email protected]>
@edsiper edsiper merged commit a6c73c8 into master Sep 6, 2024
43 of 44 checks passed
@edsiper edsiper deleted the cosmo0920-handle-integer-values-as-integer-representation-on-influxdb-line-protocol branch September 6, 2024 00:04
@edsiper
Copy link
Member

edsiper commented Sep 6, 2024

@cosmo0920 would you please submit a backport for 3.1 ?

@cosmo0920
Copy link
Contributor Author

cosmo0920 commented Sep 6, 2024

Sure, I will do.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Influxdb output does not handle integer value correctly
2 participants